介绍

本章节介绍如何远程用Confluent Platform软件包作为客户端创建Cluster Linking以及Cluster Linking的基本使用方式。
要查看哪些集群可以使用Cluster Linking,请参阅支持的集群类型

前提条件

  • 您已经准备好满足Cluster Linking使用版本条件的source集群和destination集群。Cluster Linking将在Destination集群启动,并复制source集群的数据到destination集群
  • 文章示例假设您的source和destination集群都是阿里云流数据服务Confluent平台,并且都开通了公网访问服务,示例将通过公网来进行网络链接。
  • 本文假设source集群和目标集群需要用SASL_SSL的方式登陆集群,并且假设连接集群的时候,使用了证书进行域名校验。
  • 您在本地安装了Confluent Platform 7.0.0 或更高版本作为客户端,以及 Java 1.8 或 1.11。您可以参考部署文档来下载安装Confluent Platform软件包。
  • 您也可以使用Confluent提供的RestProxy服务,通过API调用的方式创建和管理Cluster Linking。

    客户端环境配置

    本文以Confluent Platform 7.1.0作为客户端环境。source集群和目标集群需要用SASL_SSL的方式登陆集群,并且校验证书域名。

  • 配置客户端环境变量

将以下配置加入您的.bashrc或者.bash_profile配置文件里

  1. #CP客户端安装位置
  2. export CONFLUENT_HOME=<CP installation directory>
  3. export CONFLUENT_CONFIG=$CONFLUENT_HOME/etc/kafka
  4. export PATH=${CONFLUENT_HOME}/bin:$PATH
  5. #source集群的bootstrap server地址
  6. export SOURCE_ADDRESS=<your source cluster access address>
  7. #destination集群的bootstrap server地址
  8. export DESTINATION_ADDRESS=<your destination cluster access address>
  • 配置source集群访问文件${CONFLUENT_CONFIG}/source.config

    bootstrap.servers=<your source cluster access address>
    security.protocol=SASL_SSL
    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='<username>'  password='<password>';
    sasl.mechanism=PLAIN
    ssl.truststore.location=<your source cluster truststore location>
    ssl.truststore.password=<your source cluster truststore password>
    
  • 配置destination集群访问文件${CONFLUENT_CONFIG}/destination.config

    bootstrap.servers=<your destination cluster access address>
    security.protocol=SASL_SSL
    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='<username>'  password='<password>';
    sasl.mechanism=PLAIN
    ssl.truststore.location=<your destination cluster truststore location>
    ssl.truststore.password=<your destination cluster truststore password>
    
  • 配置cluster link配置文件${CONFLUENT_CONFIG}/clusterlink.config

    bootstrap.servers=<your source cluster access address>
    security.protocol=SASL_SSL
    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='<username>'  password='<password>';
    sasl.mechanism=PLAIN
    ssl.truststore.type=PEM
    ssl.truststore.certificates=<your source cluster certificates content>
    

    在Source集群上准备测试数据

    使用Confluent Platform CLI执行相关命令,准备source集群上的测试数据。

  1. 在source集群上创建一个具有单分区的待镜像的Topic,以便更容易观察复制消息的顺序。

    kafka-topics --create --topic demo-link-topic --partitions 1 --bootstrap-server ${SOURCE_ADDRESS} --command-config ${CONFLUENT_CONFIG}/source.config
    

    确认Topic已经创建成功

    #list topic
    kafka-topics --list --bootstrap-server ${SOURCE_ADDRESS} --command-config ${CONFLUENT_CONFIG}/source.config
    #describe topic
    kafka-topics --describe --topic demo-link-topic --bootstrap-server ${SOURCE_ADDRESS} --command-config ${CONFLUENT_CONFIG}/source.config
    
  2. 向source集群上的测试Topic:demo-link-topic发送消息。

    seq 1 5 | kafka-console-producer --topic demo-link-topic --bootstrap-server ${SOURCE_ADDRESS} --producer.config ${CONFLUENT_CONFIG}/source.config
    
  3. 从source集群上的Topic:demo-link-topic消费.

    kafka-console-consumer --topic demo-link-topic --from-beginning --bootstrap-server ${SOURCE_ADDRESS} --consumer.config ${CONFLUENT_CONFIG}/source.config
    

    如果Topic成功消费消息,您的输出将是:

1
2
3
4
5

使用键盘命令 Ctrl+C终止kafka-console-consumer命令。

将数据从source集群镜像到destination集群

本章节描述如何设置和测试cluster link。

  1. 创建从source集群到destination集群的cluster link:demo-link。

    kafka-cluster-links --command-config $CONFLUENT_CONFIG/destination.config --bootstrap-server ${DESTINATION_ADDRESS} --create --link demo-link --config-file ${CONFLUENT_CONFIG}/clusterlink.config
    
  2. 使用命令检查cluster link是否存在

    kafka-cluster-links --command-config ${CONFLUENT_CONFIG}/destination.config --bootstrap-server ${DESTINATION_ADDRESS} --list
    
  3. 查看cluster link的配置

    kafka-configs --describe --cluster-link demo-link --bootstrap-server $DESTINATION_ADDRESS --command-config ${CONFLUENT_CONFIG}/destination.config
    
  4. 创建Mirror topic

    kafka-mirrors --create --mirror-topic demo-link-topic --link demo-link --bootstrap-server ${DESTINATION_ADDRESS} --command-config $CONFLUENT_CONFIG/destination.config
    
  5. 查看Mirror topic

    kafka-mirrors --describe --link demo-link --bootstrap-server ${DESTINATION_ADDRESS} --command-config ${CONFLUENT_CONFIG}/destination.config
    
  6. 在destination集群消费Mirror topic数据

    kafka-console-consumer --topic demo-link-topic --from-beginning --bootstrap-server ${DESTINATION_ADDRESS} --consumer.config ${CONFLUENT_CONFIG}/destination.config
    

    如果Topic成功消费消息,您的输出将是:

1
2
3
4
5

使用键盘命令 Ctrl+C终止kafka-console-consumer命令。

  1. 查看cluster link的复制状态
    kafka-replica-status --topics demo-link-topic --include-linked --bootstrap-server ${DESTINATION_ADDRESS} --admin.config ${CONFLUENT_CONFIG}/destination.config
    

    下线Cluster link

  • 停止消费者和生产者

在各自的命令窗口中用Ctl-C停止消费者和生产者。

  • 转换mirror topic为普通topic

    kafka-mirrors --promote --topics demo-link-topic --bootstrap-server $DESTINATION_ADDRESS --command-config ${CONFLUENT_CONFIG}/destination.config
    

    输出结果:

    Calculating max offset and ms lag for mirror topics: [demo-link-topic]
    Finished calculating max offset lag and max lag ms for mirror topics: [demo-link-topic]
    Request for stopping topic demo-link-topic's mirror was successfully scheduled. Please use the describe command with the --pending-stopped-only option to monitor progress.
    
  • 删除source集群和destination集群的测试topic

    kafka-topics --delete --topic demo-link-topic --bootstrap-server ${DESTINATION_ADDRESS} --command-config ${CONFLUENT_CONFIG}/destination.config
    kafka-topics --delete --topic demo-link-topic --bootstrap-server ${SOURCE_ADDRESS} --command-config ${CONFLUENT_CONFIG}/source.config
    
  • 删除cluster link

  1. 查看cluster link列表

    kafka-cluster-links --list --bootstrap-server ${DESTINATION_ADDRESS} --command-config ${CONFLUENT_CONFIG}/destination.config
    
  2. 删除cluster link

    kafka-cluster-links --delete --link demo-link --bootstrap-server ${DESTINATION_ADDRESS} --command-config ${CONFLUENT_CONFIG}/destination.config
    

    输出结果:

    Cluster link 'demo-link' deletion successfully completed.
    

    API管理Cluster Linking

    Cluster Linking创建

    Cluster Linking支持Destination Initialed (DI)和Source Initialed (SI)两种方式创建方式。以下示例Source Cluster为Confluent Platform On Alibaba Cloud, Destination Cluster为Confluent Cloud。

  • DestinationInitialed方式在Destination集群侧创建端到端的Cluster Link对象,建立Link Connection,Source集群需要对Destination集群开放Kafka Broker服务端口。

Cluster Linking基本使用 - 图1

# 1 - 创建Destination Initialed Link
# destination_cluster_endpoint    示例:pkc-37p9m.us-east4.gcp.confluent.cloud:443 (阿里云Confluent格式为rb-ab9b2deb067846e3-internal.csp.aliyuncs.com:8090)
# destination_cluster_id    示例:lkc-9k5jqy
#    link_name    示例:link-di3
# source_cluster_id    示例:MToL0AbqRMqxznu7C7CPOw
# source_cluster_address    示例:rb-ab9b2deb067846e3.csp.aliyuncs.com:9092
# 返回201 Created即为创建成功

curl --location --request POST 'https://{{destination_cluster_endpoint}}/kafka/v3/clusters/{{destination_cluster_id}}/links?link_name={{link_name}}' \
--header 'Authorization: Basic VFhaT1VTU1VaS1YzUlhaSDorWnJWVlFHN3BPcFllLzBrWFliS2ROWlI5U1ZUcXhPWHN0ZHhIVitLY0g2MEVKTUsyQTVZcW9CNXllb05GemVV' \
--header 'Content-Type: application/json' \
--data-raw '{
"source_cluster_id": "{{source_cluster_id}}",
"configs": [
  {
    "name": "bootstrap.servers",
    "value": "{{source_cluster_address}}"
  },
  {
    "name": "security.protocol",
    "value": "SASL_SSL"
  },
  {
    "name": "sasl.mechanism",
    "value": "PLAIN"
  },
  {
    "name": "sasl.jaas.config",
    "value": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"{{source_cluster_username}}\" password=\"{{source_cluster_password}}\";"
  }
]
}'

# 2 - 创建Mirror Topic(确保source topic已存在)
curl --location --request POST 'https://{{destination_cluster_endpoint}}/kafka/v3/clusters/{{destination_cluster_id}}/links/{{link_name}}/mirrors' \
--header 'Authorization: Basic VFhaT1VTU1VaS1YzUlhaSDorWnJWVlFHN3BPcFllLzBrWFliS2ROWlI5U1ZUcXhPWHN0ZHhIVitLY0g2MEVKTUsyQTVZcW9CNXllb05GemVV' \
--header 'Content-Type: application/json' \
--data-raw '{
"source_topic_name": "test-di1",
"configs": [

]
}'
  • SourceInitialed方式需要在Destination集群侧创建Cluster Link对象,然后在Source集群创建到Destination集群对应Cluster Link对象,建立Link Connection,Destination集群需要对Source集群开放Kafka Broker服务端口。

Cluster Linking基本使用 - 图2

# 1 - 创建Source Initialed Link at Destination
# destination_cluster_endpoint    示例:pkc-37p9m.us-east4.gcp.confluent.cloud:443
# destination_cluster_id    示例:lkc-9k5jqy
#    link_name    示例:link-di3
# source_cluster_id    示例:MToL0AbqRMqxznu7C7CPOw
# 返回201 Created即为创建成功

curl --location --request POST 'https://{{destination_cluster_endpoint}}/kafka/v3/clusters/{{lkc-9k5jqy}}/links?link_name={{link_name}}' \
--header 'Authorization: Basic VFhaT1VTU1VaS1YzUlhaSDorWnJWVlFHN3BPcFllLzBrWFliS2ROWlI5U1ZUcXhPWHN0ZHhIVitLY0g2MEVKTUsyQTVZcW9CNXllb05GemVV' \
--header 'Content-Type: application/json' \
--data-raw '{
"source_cluster_id": "{{MToL0AbqRMqxznu7C7CPOw}}",
"configs": [
  {
    "name": "link.mode",
    "value": "DESTINATION"
  },
  {
    "name": "connection.mode",
    "value": "INBOUND"
  }
]
}'

# 2 - 创建Source Initialed Link at Source
# source_cluster_endpoint    示例:rb-ab9b2deb067846e3-internal.csp.aliyuncs.com:8090
# local_bootstrap_servers    示例:kafka.confluent.svc.cluster.local.c0f3f3925138a4a3c8e7efedbb6c053bd:9071
# destination_cluster_address    示例:pkc-37p9m.us-east4.gcp.confluent.cloud:9092
#    link_name    示例:link-di3
# source_cluster_id    示例:MToL0AbqRMqxznu7C7CPOw

curl --location --request POST 'https://{{source_cluster_endpoint}}/kafka/v3/clusters/{{source_cluster_id}}/links?link_name={{link_name}}' \
--header 'Authorization: Basic YWRtaW46YWRtaW4=' \
--header 'Content-Type: application/json' \
--data-raw '{
"destination_cluster_id": "lkc-9k5jqy",
"configs": [
  {
    "name": "bootstrap.servers",
    "value": "{{destination_cluster_address}}"
  },
  {
    "name": "link.mode",
    "value": "SOURCE"
  },
  {
    "name": "connection.mode",
    "value": "OUTBOUND"
  },
  {
    "name": "security.protocol",
    "value": "SASL_SSL"
  },
  {
    "name": "sasl.mechanism",
    "value": "PLAIN"
  },
  {
    "name": "sasl.jaas.config",
    "value": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"{{destination_cluster_username}}\" password=\"{{destination_cluster_password}}\";"
  },
  {
    "name": "local.bootstrap.servers",
    "value": "{{local_bootstrap_servers}}"
  },
  {
    "name": "local.security.protocol",
    "value": "SASL_SSL"
  },
  {
    "name": "local.sasl.mechanism",
    "value": "PLAIN"
  },
  {
    "name": "local.sasl.jaas.config",
    "value": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"{{source_cluster_username}}\" password=\"{{source_cluster_password}}\";"
  }
]
}'

# 3 - 创建Mirror Topic(确保source topic已存在)
curl --location --request POST 'https://{{destination_cluster_endpoint}}/kafka/v3/clusters/{{destination_cluster_id}}/links/{{link_name}}/mirrors' \
--header 'Authorization: Basic VFhaT1VTU1VaS1YzUlhaSDorWnJWVlFHN3BPcFllLzBrWFliS2ROWlI5U1ZUcXhPWHN0ZHhIVitLY0g2MEVKTUsyQTVZcW9CNXllb05GemVV' \
--header 'Content-Type: application/json' \
--data-raw '{
"source_topic_name": "test-si1",
"configs": [

]
}'

Cluster Linking管理

# 1 - 获取当前集群Cluster Linking
# cluster_endpoint    示例:rb-b89faf8f4ef1fa22-internal.csp.aliyuncs.com:8090
# cluster_id    示例:RCJUOzrpQIqBZkxfjx63Hw

curl --location --request GET 'https://{{cluster_endpoint}}/kafka/v3/clusters/{{cluster_id}}/links' \
--header 'Content-Type: application/json' \
--header 'Authorization: Basic YWRtaW46YWRtaW4='

response:
{
    "kind": "KafkaLinkDataList",
    "metadata": {
        "self": "https://rb-b89faf8f4ef1fa22-internal.csp.aliyuncs.com:8090/kafka/v3/clusters/RCJUOzrpQIqBZkxfjx63Hw/links",
        "next": null
    },
    "data": [
        {
            "kind": "KafkaLinkData",
            "metadata": {
                "self": "https://rb-b89faf8f4ef1fa22-internal.csp.aliyuncs.com:8090/kafka/v3/clusters/RCJUOzrpQIqBZkxfjx63Hw/links/link-si1"
            },
            "source_cluster_id": "MToL0AbqRMqxznu7C7CPOw",
            "destination_cluster_id": null,
            "link_name": "link-si1",
            "link_id": "e6af9bc0-c3ff-4d50-b110-092d83ed37cc",
            "cluster_link_id": "5q-bwMP_TVCxEAktg-03zA",
            "topic_names": [
                "test-si1"
            ]
        },
        {
            "kind": "KafkaLinkData",
            "metadata": {
                "self": "https://rb-b89faf8f4ef1fa22-internal.csp.aliyuncs.com:8090/kafka/v3/clusters/RCJUOzrpQIqBZkxfjx63Hw/links/link-di1"
            },
            "source_cluster_id": "MToL0AbqRMqxznu7C7CPOw",
            "destination_cluster_id": null,
            "link_name": "link-di1",
            "link_id": "d8b5084e-9082-4fe4-8336-d9f7290f0a6b",
            "cluster_link_id": "2LUITpCCT-SDNtn3KQ8Kaw",
            "topic_names": [
                "test1"
            ]
        }
    ]
}

# 2 - 删除Cluster Linking
# cluster_endpoint    示例:rb-b89faf8f4ef1fa22-internal.csp.aliyuncs.com:8090
# cluster_id    示例:RCJUOzrpQIqBZkxfjx63Hw
#    link_name    示例:link-di1

curl --location --request DELETE 'https://{{cluster_endpoint}}/kafka/v3/clusters/{{cluster_id}}/links/{{link_name}}' \
--header 'Authorization: Basic YWRtaW46YWRtaW4=' \


参考资料