介绍
本章节介绍如何远程用Confluent Platform软件包作为客户端创建Cluster Linking以及Cluster Linking的基本使用方式。
要查看哪些集群可以使用Cluster Linking,请参阅支持的集群类型。
前提条件
- 您已经准备好满足Cluster Linking使用版本条件的source集群和destination集群。Cluster Linking将在Destination集群启动,并复制source集群的数据到destination集群
- 文章示例假设您的source和destination集群都是阿里云流数据服务Confluent平台,并且都开通了公网访问服务,示例将通过公网来进行网络链接。
- 本文假设source集群和目标集群需要用SASL_SSL的方式登陆集群,并且假设连接集群的时候,使用了证书进行域名校验。
- 您在本地安装了Confluent Platform 7.0.0 或更高版本作为客户端,以及 Java 1.8 或 1.11。您可以参考部署文档来下载安装Confluent Platform软件包。
您也可以使用Confluent提供的RestProxy服务,通过API调用的方式创建和管理Cluster Linking。
客户端环境配置
本文以Confluent Platform 7.1.0作为客户端环境。source集群和目标集群需要用SASL_SSL的方式登陆集群,并且校验证书域名。
配置客户端环境变量
将以下配置加入您的.bashrc或者.bash_profile配置文件里
#CP客户端安装位置export CONFLUENT_HOME=<CP installation directory>export CONFLUENT_CONFIG=$CONFLUENT_HOME/etc/kafkaexport PATH=${CONFLUENT_HOME}/bin:$PATH#source集群的bootstrap server地址export SOURCE_ADDRESS=<your source cluster access address>#destination集群的bootstrap server地址export DESTINATION_ADDRESS=<your destination cluster access address>
配置source集群访问文件${CONFLUENT_CONFIG}/source.config
bootstrap.servers=<your source cluster access address> security.protocol=SASL_SSL sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='<username>' password='<password>'; sasl.mechanism=PLAIN ssl.truststore.location=<your source cluster truststore location> ssl.truststore.password=<your source cluster truststore password>配置destination集群访问文件${CONFLUENT_CONFIG}/destination.config
bootstrap.servers=<your destination cluster access address> security.protocol=SASL_SSL sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='<username>' password='<password>'; sasl.mechanism=PLAIN ssl.truststore.location=<your destination cluster truststore location> ssl.truststore.password=<your destination cluster truststore password>配置cluster link配置文件${CONFLUENT_CONFIG}/clusterlink.config
bootstrap.servers=<your source cluster access address> security.protocol=SASL_SSL sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='<username>' password='<password>'; sasl.mechanism=PLAIN ssl.truststore.type=PEM ssl.truststore.certificates=<your source cluster certificates content>在Source集群上准备测试数据
使用Confluent Platform CLI执行相关命令,准备source集群上的测试数据。
在source集群上创建一个具有单分区的待镜像的Topic,以便更容易观察复制消息的顺序。
kafka-topics --create --topic demo-link-topic --partitions 1 --bootstrap-server ${SOURCE_ADDRESS} --command-config ${CONFLUENT_CONFIG}/source.config确认Topic已经创建成功
#list topic kafka-topics --list --bootstrap-server ${SOURCE_ADDRESS} --command-config ${CONFLUENT_CONFIG}/source.config #describe topic kafka-topics --describe --topic demo-link-topic --bootstrap-server ${SOURCE_ADDRESS} --command-config ${CONFLUENT_CONFIG}/source.config向source集群上的测试Topic:demo-link-topic发送消息。
seq 1 5 | kafka-console-producer --topic demo-link-topic --bootstrap-server ${SOURCE_ADDRESS} --producer.config ${CONFLUENT_CONFIG}/source.config从source集群上的Topic:demo-link-topic消费.
kafka-console-consumer --topic demo-link-topic --from-beginning --bootstrap-server ${SOURCE_ADDRESS} --consumer.config ${CONFLUENT_CONFIG}/source.config如果Topic成功消费消息,您的输出将是:
| 1 2 3 4 5 |
|---|
使用键盘命令 Ctrl+C终止kafka-console-consumer命令。
将数据从source集群镜像到destination集群
本章节描述如何设置和测试cluster link。
创建从source集群到destination集群的cluster link:demo-link。
kafka-cluster-links --command-config $CONFLUENT_CONFIG/destination.config --bootstrap-server ${DESTINATION_ADDRESS} --create --link demo-link --config-file ${CONFLUENT_CONFIG}/clusterlink.config使用命令检查cluster link是否存在
kafka-cluster-links --command-config ${CONFLUENT_CONFIG}/destination.config --bootstrap-server ${DESTINATION_ADDRESS} --list查看cluster link的配置
kafka-configs --describe --cluster-link demo-link --bootstrap-server $DESTINATION_ADDRESS --command-config ${CONFLUENT_CONFIG}/destination.config创建Mirror topic
kafka-mirrors --create --mirror-topic demo-link-topic --link demo-link --bootstrap-server ${DESTINATION_ADDRESS} --command-config $CONFLUENT_CONFIG/destination.config查看Mirror topic
kafka-mirrors --describe --link demo-link --bootstrap-server ${DESTINATION_ADDRESS} --command-config ${CONFLUENT_CONFIG}/destination.config在destination集群消费Mirror topic数据
kafka-console-consumer --topic demo-link-topic --from-beginning --bootstrap-server ${DESTINATION_ADDRESS} --consumer.config ${CONFLUENT_CONFIG}/destination.config如果Topic成功消费消息,您的输出将是:
| 1 2 3 4 5 |
|---|
使用键盘命令 Ctrl+C终止kafka-console-consumer命令。
- 查看cluster link的复制状态
kafka-replica-status --topics demo-link-topic --include-linked --bootstrap-server ${DESTINATION_ADDRESS} --admin.config ${CONFLUENT_CONFIG}/destination.config下线Cluster link
- 停止消费者和生产者
在各自的命令窗口中用Ctl-C停止消费者和生产者。
转换mirror topic为普通topic
kafka-mirrors --promote --topics demo-link-topic --bootstrap-server $DESTINATION_ADDRESS --command-config ${CONFLUENT_CONFIG}/destination.config输出结果:
Calculating max offset and ms lag for mirror topics: [demo-link-topic] Finished calculating max offset lag and max lag ms for mirror topics: [demo-link-topic] Request for stopping topic demo-link-topic's mirror was successfully scheduled. Please use the describe command with the --pending-stopped-only option to monitor progress.删除source集群和destination集群的测试topic
kafka-topics --delete --topic demo-link-topic --bootstrap-server ${DESTINATION_ADDRESS} --command-config ${CONFLUENT_CONFIG}/destination.config kafka-topics --delete --topic demo-link-topic --bootstrap-server ${SOURCE_ADDRESS} --command-config ${CONFLUENT_CONFIG}/source.config删除cluster link
查看cluster link列表
kafka-cluster-links --list --bootstrap-server ${DESTINATION_ADDRESS} --command-config ${CONFLUENT_CONFIG}/destination.config删除cluster link
kafka-cluster-links --delete --link demo-link --bootstrap-server ${DESTINATION_ADDRESS} --command-config ${CONFLUENT_CONFIG}/destination.config输出结果:
Cluster link 'demo-link' deletion successfully completed.API管理Cluster Linking
Cluster Linking创建
Cluster Linking支持Destination Initialed (DI)和Source Initialed (SI)两种方式创建方式。以下示例Source Cluster为Confluent Platform On Alibaba Cloud, Destination Cluster为Confluent Cloud。
- DestinationInitialed方式在Destination集群侧创建端到端的Cluster Link对象,建立Link Connection,Source集群需要对Destination集群开放Kafka Broker服务端口。

# 1 - 创建Destination Initialed Link
# destination_cluster_endpoint 示例:pkc-37p9m.us-east4.gcp.confluent.cloud:443 (阿里云Confluent格式为rb-ab9b2deb067846e3-internal.csp.aliyuncs.com:8090)
# destination_cluster_id 示例:lkc-9k5jqy
# link_name 示例:link-di3
# source_cluster_id 示例:MToL0AbqRMqxznu7C7CPOw
# source_cluster_address 示例:rb-ab9b2deb067846e3.csp.aliyuncs.com:9092
# 返回201 Created即为创建成功
curl --location --request POST 'https://{{destination_cluster_endpoint}}/kafka/v3/clusters/{{destination_cluster_id}}/links?link_name={{link_name}}' \
--header 'Authorization: Basic VFhaT1VTU1VaS1YzUlhaSDorWnJWVlFHN3BPcFllLzBrWFliS2ROWlI5U1ZUcXhPWHN0ZHhIVitLY0g2MEVKTUsyQTVZcW9CNXllb05GemVV' \
--header 'Content-Type: application/json' \
--data-raw '{
"source_cluster_id": "{{source_cluster_id}}",
"configs": [
{
"name": "bootstrap.servers",
"value": "{{source_cluster_address}}"
},
{
"name": "security.protocol",
"value": "SASL_SSL"
},
{
"name": "sasl.mechanism",
"value": "PLAIN"
},
{
"name": "sasl.jaas.config",
"value": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"{{source_cluster_username}}\" password=\"{{source_cluster_password}}\";"
}
]
}'
# 2 - 创建Mirror Topic(确保source topic已存在)
curl --location --request POST 'https://{{destination_cluster_endpoint}}/kafka/v3/clusters/{{destination_cluster_id}}/links/{{link_name}}/mirrors' \
--header 'Authorization: Basic VFhaT1VTU1VaS1YzUlhaSDorWnJWVlFHN3BPcFllLzBrWFliS2ROWlI5U1ZUcXhPWHN0ZHhIVitLY0g2MEVKTUsyQTVZcW9CNXllb05GemVV' \
--header 'Content-Type: application/json' \
--data-raw '{
"source_topic_name": "test-di1",
"configs": [
]
}'
- SourceInitialed方式需要在Destination集群侧创建Cluster Link对象,然后在Source集群创建到Destination集群对应Cluster Link对象,建立Link Connection,Destination集群需要对Source集群开放Kafka Broker服务端口。

# 1 - 创建Source Initialed Link at Destination
# destination_cluster_endpoint 示例:pkc-37p9m.us-east4.gcp.confluent.cloud:443
# destination_cluster_id 示例:lkc-9k5jqy
# link_name 示例:link-di3
# source_cluster_id 示例:MToL0AbqRMqxznu7C7CPOw
# 返回201 Created即为创建成功
curl --location --request POST 'https://{{destination_cluster_endpoint}}/kafka/v3/clusters/{{lkc-9k5jqy}}/links?link_name={{link_name}}' \
--header 'Authorization: Basic VFhaT1VTU1VaS1YzUlhaSDorWnJWVlFHN3BPcFllLzBrWFliS2ROWlI5U1ZUcXhPWHN0ZHhIVitLY0g2MEVKTUsyQTVZcW9CNXllb05GemVV' \
--header 'Content-Type: application/json' \
--data-raw '{
"source_cluster_id": "{{MToL0AbqRMqxznu7C7CPOw}}",
"configs": [
{
"name": "link.mode",
"value": "DESTINATION"
},
{
"name": "connection.mode",
"value": "INBOUND"
}
]
}'
# 2 - 创建Source Initialed Link at Source
# source_cluster_endpoint 示例:rb-ab9b2deb067846e3-internal.csp.aliyuncs.com:8090
# local_bootstrap_servers 示例:kafka.confluent.svc.cluster.local.c0f3f3925138a4a3c8e7efedbb6c053bd:9071
# destination_cluster_address 示例:pkc-37p9m.us-east4.gcp.confluent.cloud:9092
# link_name 示例:link-di3
# source_cluster_id 示例:MToL0AbqRMqxznu7C7CPOw
curl --location --request POST 'https://{{source_cluster_endpoint}}/kafka/v3/clusters/{{source_cluster_id}}/links?link_name={{link_name}}' \
--header 'Authorization: Basic YWRtaW46YWRtaW4=' \
--header 'Content-Type: application/json' \
--data-raw '{
"destination_cluster_id": "lkc-9k5jqy",
"configs": [
{
"name": "bootstrap.servers",
"value": "{{destination_cluster_address}}"
},
{
"name": "link.mode",
"value": "SOURCE"
},
{
"name": "connection.mode",
"value": "OUTBOUND"
},
{
"name": "security.protocol",
"value": "SASL_SSL"
},
{
"name": "sasl.mechanism",
"value": "PLAIN"
},
{
"name": "sasl.jaas.config",
"value": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"{{destination_cluster_username}}\" password=\"{{destination_cluster_password}}\";"
},
{
"name": "local.bootstrap.servers",
"value": "{{local_bootstrap_servers}}"
},
{
"name": "local.security.protocol",
"value": "SASL_SSL"
},
{
"name": "local.sasl.mechanism",
"value": "PLAIN"
},
{
"name": "local.sasl.jaas.config",
"value": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"{{source_cluster_username}}\" password=\"{{source_cluster_password}}\";"
}
]
}'
# 3 - 创建Mirror Topic(确保source topic已存在)
curl --location --request POST 'https://{{destination_cluster_endpoint}}/kafka/v3/clusters/{{destination_cluster_id}}/links/{{link_name}}/mirrors' \
--header 'Authorization: Basic VFhaT1VTU1VaS1YzUlhaSDorWnJWVlFHN3BPcFllLzBrWFliS2ROWlI5U1ZUcXhPWHN0ZHhIVitLY0g2MEVKTUsyQTVZcW9CNXllb05GemVV' \
--header 'Content-Type: application/json' \
--data-raw '{
"source_topic_name": "test-si1",
"configs": [
]
}'
Cluster Linking管理
# 1 - 获取当前集群Cluster Linking
# cluster_endpoint 示例:rb-b89faf8f4ef1fa22-internal.csp.aliyuncs.com:8090
# cluster_id 示例:RCJUOzrpQIqBZkxfjx63Hw
curl --location --request GET 'https://{{cluster_endpoint}}/kafka/v3/clusters/{{cluster_id}}/links' \
--header 'Content-Type: application/json' \
--header 'Authorization: Basic YWRtaW46YWRtaW4='
response:
{
"kind": "KafkaLinkDataList",
"metadata": {
"self": "https://rb-b89faf8f4ef1fa22-internal.csp.aliyuncs.com:8090/kafka/v3/clusters/RCJUOzrpQIqBZkxfjx63Hw/links",
"next": null
},
"data": [
{
"kind": "KafkaLinkData",
"metadata": {
"self": "https://rb-b89faf8f4ef1fa22-internal.csp.aliyuncs.com:8090/kafka/v3/clusters/RCJUOzrpQIqBZkxfjx63Hw/links/link-si1"
},
"source_cluster_id": "MToL0AbqRMqxznu7C7CPOw",
"destination_cluster_id": null,
"link_name": "link-si1",
"link_id": "e6af9bc0-c3ff-4d50-b110-092d83ed37cc",
"cluster_link_id": "5q-bwMP_TVCxEAktg-03zA",
"topic_names": [
"test-si1"
]
},
{
"kind": "KafkaLinkData",
"metadata": {
"self": "https://rb-b89faf8f4ef1fa22-internal.csp.aliyuncs.com:8090/kafka/v3/clusters/RCJUOzrpQIqBZkxfjx63Hw/links/link-di1"
},
"source_cluster_id": "MToL0AbqRMqxznu7C7CPOw",
"destination_cluster_id": null,
"link_name": "link-di1",
"link_id": "d8b5084e-9082-4fe4-8336-d9f7290f0a6b",
"cluster_link_id": "2LUITpCCT-SDNtn3KQ8Kaw",
"topic_names": [
"test1"
]
}
]
}
# 2 - 删除Cluster Linking
# cluster_endpoint 示例:rb-b89faf8f4ef1fa22-internal.csp.aliyuncs.com:8090
# cluster_id 示例:RCJUOzrpQIqBZkxfjx63Hw
# link_name 示例:link-di1
curl --location --request DELETE 'https://{{cluster_endpoint}}/kafka/v3/clusters/{{cluster_id}}/links/{{link_name}}' \
--header 'Authorization: Basic YWRtaW46YWRtaW4=' \
参考资料
- 官方文档:Cluster Linking
