Kafka MirrorMaker
测试
环境
源Kafka集群:版本:kafka_2.11-2.0.0,地址:cluster2。
目标Kafka集群:版本:kafka_2.10-0.10.1,地址:cluster3。
脚本
目标Kafka集群新建消费者配置 $KAFKA_HOME/test/mirror-consumer.properties:
bootstrap.servers=cluster2:9092group.id=test-mirrorauto.offset.reset=earliestpartition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor
目标Kafka集群新建生产者配置 $KAFKA_HOME/test/mirror-producer.properties:
bootstrap.servers=cluster3:9092
源Kafka集群和目标Kafka集群新增脚本 $KAFKA_HOME/test/print-offset.sh,检查指定主题的各个分区的起始位移和结束位移:
echo "topic: $1"
echo "offset start:"
$KAFKA_HOME/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic $1 --time -2
echo "offset end:"
$KAFKA_HOME/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic $1 --time -1
数据
源Kafka集群创建主题和分区:
$KAFKA_HOME/bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic test-mirror --partitions 32
目标Kafka集群创建主题和分区:
# 0.10.1版本通过--zookeeper指定集群位置
$KAFKA_HOME/bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic test-mirror --partitions 32 --replication-factor 1
源Kafka集群准备测试数据:
public class TestMirrorProducer {
/**
*
* @param args 格式:servers topic record_num
*/
public static void main(String[] args) {
String servers = "cluster2:9092"; //args[0];
String topic = "test-mirror"; //args[1];
int recordNum = 1000000; //Integer.valueOf(args[2]);
long time = System.nanoTime();
Producer<String, Long> producer = createProducer(servers);
List<PartitionInfo> partitionInfos = producer.partitionsFor(topic);
int partitionSize = partitionInfos.size();
for (int i = 0; i < recordNum; i++) {
ProducerRecord<String, Long> record = new ProducerRecord<>(topic, i % partitionSize, null, (long) i);
producer.send(record);
}
producer.flush();
producer.close();
long spend = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - time);
System.out.println(String.format("主题:%s, 数据:%s, 耗时:%sms", topic, recordNum, spend));
}
private static Producer<String, Long> createProducer(String bootstrapServers) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.CLIENT_ID_CONFIG, "mirror-maker-producer");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName());
return new KafkaProducer<>(props);
}
}
同步数据
目标kafka集群启动MirrorMaker脚本:
# 在$KAFKA_HOME根目录执行
bin/kafka-mirror-maker.sh --consumer.config test/mirror-consumer.properties --producer.config test/mirror-producer.properties --num.streams 5 --whitelist test-mirror
结果对比
源Kafka集群执行print-offset.sh,查看主题test-mirror下各分区的数据:
topic: test-mirror
offset start:
test-mirror:0:0
test-mirror:1:0
...
test-mirror:31:0
offset end:
test-mirror:0:31250
test-mirror:1:31250
...
test-mirror:31:31250
目标Kafka集群执行print-offset.sh,查看主题test-mirror下各分区的数据:
topic: test-mirror
offset start:
test-mirror:0:0
test-mirror:1:0
...
test-mirror:31:0
offset end:
test-mirror:0:31250
test-mirror:1:31250
...
test-mirror:31:31250
总结
- 反复测试的时候,需要修改test/mirror-consumer.properties的参数group.id,清理消费者位移;重建目标Kafka集群的主题。
- 需要在启动脚本中通过whitelist指定需要同步的主题,维护不方便。
生产配置
启动脚本mirror-maker.sh
每一个参数的含义可以阅读kafka-mirror-maker.sh和kafka-run-class.sh两个脚本。
KAFKA_HOME=/opt/kafka2
MIRROR_MAKER_DIR=$KAFKA_HOME/mirror-maker
export DAEMON_MODE=true
export LOG_DIR=$MIRROR_MAKER_DIR/logs
export CONSOLE_OUTPUT_FILE=$LOG_DIR/console.out
export KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:$MIRROR_MAKER_DIR/log4j.properties"
export JMX_PORT=25894
export KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote.rmi.port=$JMX_PORT -XX:+DisableAttachMechanism -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false "
$KAFKA_HOME/bin/kafka-mirror-maker.sh --consumer.config $MIRROR_MAKER_DIR/consumer.properties --producer.config $MIRROR_MAKER_DIR/producer.properties --num.streams 5 --whitelist $1
日志配置文件log4j.properties
log4j.rootLogger=INFO, RollingAppender
log4j.appender.RollingAppender=org.apache.log4j.DailyRollingFileAppender
log4j.appender.RollingAppender.File=${kafka.logs.dir}/mirrormaker.log
log4j.appender.RollingAppender.DatePattern='.'yyyy-MM-dd
log4j.appender.RollingAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.RollingAppender.layout.ConversionPattern=[%p] %d %c %M - %m%n
consumer.properties和producer.properties同测试阶段配置。
参考:
源码分析
消费者配置
说明:
- 不自动提交消费位移
- 不解析消息
// Disable consumer auto offsets commit to prevent data loss.
maybeSetDefaultProperty(consumerConfigProps, "enable.auto.commit", "false")
// Hardcode the deserializer to ByteArrayDeserializer
consumerConfigProps.setProperty("key.deserializer", classOf[ByteArrayDeserializer].getName)
consumerConfigProps.setProperty("value.deserializer", classOf[ByteArrayDeserializer].getName)
// The default client id is group id, we manually set client id to groupId-index to avoid metric collision
val groupIdString = consumerConfigProps.getProperty("group.id")
val consumers = (0 until numStreams) map { i =>
consumerConfigProps.setProperty("client.id", groupIdString + "-" + i.toString)
new KafkaConsumer[Array[Byte], Array[Byte]](consumerConfigProps)
}
生产者配置
说明:
- 异步发送消息
- 避免消息丢失
- 不解析消息
val producerProps = Utils.loadProps(options.valueOf(producerConfigOpt))
val sync = producerProps.getProperty("producer.type", "async").equals("sync")
producerProps.remove("producer.type")
// Defaults to no data loss settings.
maybeSetDefaultProperty(producerProps, ProducerConfig.RETRIES_CONFIG, Int.MaxValue.toString)
maybeSetDefaultProperty(producerProps, ProducerConfig.MAX_BLOCK_MS_CONFIG, Long.MaxValue.toString)
maybeSetDefaultProperty(producerProps, ProducerConfig.ACKS_CONFIG, "all")
maybeSetDefaultProperty(producerProps, ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1")
// Always set producer key and value serializer to ByteArraySerializer.
producerProps.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer")
producerProps.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer")
producer = new MirrorMakerProducer(sync, producerProps)
消费消息
说明:
- 启动参数—num.streams配置了consumer数量,MirrorMakerThread数量等于consumer数量,所有consumer共享一个producer。
- producer默认异步发送消息,发送失败默认会关闭producer。
- 消费者位移默认1分钟提交一次。
- 如果producer最后没有把消息发出去,消息就丢失了,丢失的消息会记录在numDroppedMessages,可以通过jmx查询。
class MirrorMakerThread {
override def run() {
while (!exitingOnSendFailure && !shuttingDown) {
val data = consumerWrapper.receive()
val records = messageHandler.handle(toBaseConsumerRecord(data))
records.asScala.foreach(producer.send)
maybeFlushAndCommitOffsets()
}
producer.flush()
commitOffsets(consumerWrapper)
}
def maybeFlushAndCommitOffsets() {
if (System.currentTimeMillis() - lastOffsetCommitMs > offsetCommitIntervalMs) {
debug("Committing MirrorMaker state.")
producer.flush()
commitOffsets(consumerWrapper)
lastOffsetCommitMs = System.currentTimeMillis()
}
}
}
class MirrorMakerProducer {
def send() {
if (sync) {
this.producer.send(record).get()
} else {
this.producer.send(record,
new MirrorMakerProducerCallback(record.topic(), record.key(), record.value()))
}
}
}
class MirrorMakerProducerCallback {
override def onCompletion() {
if (exception != null) {
// Use default call back to log error. This means the max retries of producer has reached and message
// still could not be sent.
super.onCompletion(metadata, exception)
// If abort.on.send.failure is set, stop the mirror maker. Otherwise log skipped message and move on.
if (abortOnSendFailure) {
info("Closing producer due to send failure.")
exitingOnSendFailure = true
producer.close(0)
}
numDroppedMessages.incrementAndGet()
}
}
}
停止应用
说明:
- 通过kill -TERM pid停止应用。
- 正常情况下,producer.close()会把所有消息写入目标kafka之后再停止。
def main() {
Runtime.getRuntime.addShutdownHook(new Thread("MirrorMakerShutdownHook") {
override def run() {
cleanShutdown()
}
})
}
def cleanShutdown() {
if (isShuttingDown.compareAndSet(false, true)) {
info("Start clean shutdown.")
// Shutdown consumer threads.
info("Shutting down consumer threads.")
if (mirrorMakerThreads != null) {
mirrorMakerThreads.foreach(_.shutdown())
mirrorMakerThreads.foreach(_.awaitShutdown())
}
info("Closing producer.")
producer.close()
info("Kafka mirror maker shutdown successfully")
}
}
监控
监控指标
- 消费Lag,以主题为单位。
- 消费端读取字节数/秒,以主题为单位。
- 消费端读取消息数/秒,以主题为单位。
- 生产端写入字节数/秒,以主题为单位。
- 生产端写入消息数/秒,以主题为单位。
- 丢弃消息总数:kafka.tools.MirrorMaker.MirrorMaker-numDroppedMessages。
uReplicator
Kafka MirrorMaker vs uReplicator
如果只需要一个节点,使用MirrorMaker即可。如果需要多个节点,uReplicator更方便,它提供了集群管理和运维功能。
Kafka MirrorMaker不足:
- 添加主题困难。MirrorMaker需要在启动时指定管理的主题,修改主题需要重启。
- 昂贵的消费者重平衡。使用MirrorMaker集群时,单次平衡可能达到5-10分钟。
- 元数据同步问题。使用MirrorMaker集群时,MirrorMaker节点的主题配置不一致,导致集群崩溃。
uReplicator优势:
- 通过REST API动态添加删除主题。
- 重新实现了主题和分区的消费方案,减少全局重平衡带来的影响。
- 通过Apache Helix管理uReplicator集群,实现了故障自动迁移。
参考:
uReplicator: Uber Engineering’s Robust Apache Kafka Replicator
CSDN-MirrorMaker和uReplicator快速开始
测试
构建
源码:https://github.com/uber/uReplicator
版本:branch-1.0
构建命令:mvn clean package -DskipTests
构建结果:
- $UREPLICATOR_CODE_HOME/uReplicator-Distribution/target/uReplicator-Distribution-pkg/bin:示例脚本
- $UREPLICATOR_CODE_HOME/uReplicator-Distribution/target/uReplicator-Distribution-pkg/repo:构建后的jar包
- $UREPLICATOR_CODE_HOME/config:示例配置
目录组织:
将目录bin, repo, config平行放置到服务器uReplicator根目录下。
uReplicator
|--bin
|--repo
|--config
参考:
uReplicator branch-1.0 README.md文件
配置说明
uReplicator controller启动脚本
脚本位置:bin/start-controller-example1.sh。
# 最后一句
exec "$JAVACMD" $JAVA_OPTS -Dapp_name=uReplicator-Controller \
-classpath "$CLASSPATH" \
-Dapp.name="start-controller-example1" \
-Dapp.pid="$$" \
-Dapp.repo="$REPO" \
-Dapp.home="$BASEDIR" \
-Dbasedir="$BASEDIR" \
com.uber.stream.kafka.mirrormaker.starter.MirrorMakerStarter \
startMirrorMakerController -example1 "$@"
其中-example1代表的参数在代码中写死:
public class ControllerStarter {
public static ControllerStarter init(CommandLine cmd) {
ControllerConf conf = null;
if (cmd.hasOption("example1")) {
conf = getDefaultConf();
} else if (cmd.hasOption("example2")) {
conf = getExampleConf();
} else {
try {
conf = ControllerConf.getControllerConf(cmd);
} catch (Exception e) {
throw new RuntimeException("Not valid controller configurations!", e);
}
}
final ControllerStarter starter = new ControllerStarter(conf);
return starter;
}
public static ControllerConf getDefaultConf() {
final ControllerConf conf = new ControllerConf();
conf.setControllerPort("9000");
conf.setZkStr("localhost:2181");
conf.setHelixClusterName("testMirrorMaker");
conf.setDeploymentName("testDeploymentName");
conf.setBackUpToGit("false");
conf.setAutoRebalanceDelayInSeconds("120");
conf.setLocalBackupFilePath("/var/log/kafka-mirror-maker-controller");
return conf;
}
}
可以通过具体的参数来代替-example1,方便修改参数值。
-port 9000 -zookeeper localhost:2181 -helixClusterName testMirrorMaker -deploymentName testDeploymentName -backUpToGit false -autoRebalanceDelayInSeconds 120 -localBackupFilePath /var/log/kafka-mirror-maker-controller
脚本bin/start-controller-example2.sh和bin/start-controller-example1.sh的区别在于,启动参数为-example2。只要源和目标kafka存在相同的主题,就会自动同步数据,不需要在whitelist中指定。
uReplicator worker启动脚本
脚本位置:bin/start-worker-example1.sh。
# 最后一句
exec "$JAVACMD" $JAVA_OPTS -Dapp_name=uReplicator-Worker \
-classpath "$CLASSPATH" \
-Dapp.name="start-worker-example1" \
-Dapp.pid="$$" \
-Dapp.repo="$REPO" \
-Dapp.home="$BASEDIR" \
-Dbasedir="$BASEDIR" \
com.uber.stream.kafka.mirrormaker.starter.MirrorMakerStarter \
startMirrorMakerWorker kafka.mirrormaker.MirrorMakerWorker \
--consumer.config config/consumer.properties \
--producer.config config/producer.properties \
--helix.config config/helix.properties \
--topic.mappings config/topicmapping.properties "$@"
config/consumer.properties指定了源kafka消费者配置。
# 源kafka的zookeeper根目录。如果没有指定cluster_name,就配置成src_kafka_zookeeper_host:2181。
zookeeper.connect=src_kafka_zookeeper_host:2181/cluster_name
...
config/producer.properties指定了目标kafka生产者配置。
# 目标kafka的broker地址
bootstrap.servers=target_kafka:9092
...
config/helix.properties指定了Apache Helix服务配置。
# Apache Helix服务依赖的zookeeper地址
zkServer=localhost:2181
# uReplicator worker作为Apache Helix Node的id
instanceId=testHelixMirrorMaker01
# Apache Helix Cluster id
helixClusterName=testMirrorMaker
config/topicmapping.properties指定了源kafka主题名称和目标kafka主题名称的映射关系,如果名称保持一致,则无需指定这个配置。
启动
在controller和worker的启动脚本中加入JAVA_OPTS="-Dlog4j.configuration=file:$BASEDIR/config/log4j.properties",可以为服务指定log4j配置,让日志输出到文件。两个服务最好采用不同的配置。
启动uReplicator controller:
nohup bin/start-controller-example1.sh 2>&1 >controller.out &
启动uReplicator worker:
nohup bin/start-worker-example1.sh 2>&1 >work.out &
同步数据
在源kafka所在服务器执行,源kafka创建主题:
# 源kafka版本为kafka_2.11-2.0.0
$KAFKA_HOME/bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic test-mirror --partitions 5 --replication-factor 1
在目标kafka所在服务器执行,目标kafka创建主题:
# 目标kafka版本为kafka_2.10-0.10.1。主题名称、分区数和源kafka保持一致。
$KAFKA_HOME/bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic test-mirror --partitions 5 --replication-factor 1
在源kafka所在服务器执行,源kafka创建测试数据:
$KAFKA_HOME/bin/kafka-producer-perf-test.sh --topic test-mirror --num-records 100000000 --record-size=100 --throughput -1 --producer-props bootstrap.servers=localhost:9092
在uReplicator controller所在服务器执行,uReplicator添加管理主题:
curl -X POST -d '{"topic":"test-mirror", "numPartitions":"5"}' http://localhost:9000/topics
在目标kafka所在服务器执行,观察目标kafka主题同步情况:
# print-offset.sh为MirrorMaker那一节介绍的脚本
$KAFKA_HOME/test/print-offset.sh test-mirror
源码分析
REST服务
uReplicator controller启动时,创建ControllerRestApplication服务,绑定到-port参数指定的端口。
public class ControllerInstance {
public ControllerInstance() {
_component = new Component();
_controllerRestApp = new ControllerRestApplication(null);
}
public void start() throws Exception {
_component.getServers().add(Protocol.HTTP, Integer.parseInt(_config.getControllerPort()));
final Context applicationContext = _component.getContext().createChildContext();
_controllerRestApp.setContext(applicationContext);
_component.getDefaultHost().attach(_controllerRestApp);
}
}
ControllerRestApplication提供的服务:
public class ControllerRestApplication extends Application {
@Override
public Restlet createInboundRoot() {
final Router router = new Router(getContext());
router.setDefaultMatchingMode(Template.MODE_EQUALS);
// Topic Servlet
router.attach("/topics", TopicManagementRestletResource.class);
router.attach("/topics/{topicName}", TopicManagementRestletResource.class);
// Health Check Servlet
router.attach("/health", HealthCheckRestletResource.class);
// Validation Servlet
router.attach("/validation", ValidationRestletResource.class);
router.attach("/validation/{option}", ValidationRestletResource.class);
// Admin Servlet
router.attach("/admin", AdminRestletResource.class);
router.attach("/admin/{opt}", AdminRestletResource.class);
// MirrorMakerManager Servlet
router.attach("/instances", MirrorMakerManagerRestletResource.class);
router.attach("/instances/{instanceName}", MirrorMakerManagerRestletResource.class);
router.attach("/offset", TopicParitionOffsetRestletResource.class);
router.attach("/offset/{topic}/{partition}", TopicParitionOffsetRestletResource.class);
router.attach("/noprogress", NoProgressTopicPartitionRestletResource.class);
return router;
}
}
其中/instances可以查看uReplicator集群的节点状态。
/topics可以添加删除主题,查询主题的明细,每一个分区由哪一个uReplicator worker消费。
/offset可以查看消费延迟情况,通过读取zookeeper的/consumer/$consumer_group/offsets/$topic/$partition节点信息实现。我本地测试时/consumer下没有数据,所以请求/offset并没有返回信息。
消费消息
uReplicator worker消费消息的逻辑和MirrorMaker基本一致,具体实现参考代码MirrorMakerThread.run。
class MirrorMakerThread(connector: KafkaConnector) {
override def run() {
val stream = connector.getStream()
val iter = stream.iterator()
while (!exitingOnSendFailure && !shuttingDown && iter.hasNext()) {
val data = iter.next()
val records = messageHandler.handle(data)
val iterRecords = records.iterator()
while (iterRecords.hasNext) {
val record = iterRecords.next()
if (!filterEnabled || needToSend(record, srcCluster, dstCluster, data.offset)) {
producer.send(record, data.partition, data.offset)
}
}
maybeFlushAndCommitOffsets(false)
}
...
}
}
消费者再均衡
uReplicator通过Apache Helix实现故障转移,uReplicator worker实例就是Apache Helix Cluster的Node,uReplicator管理的主题就是Apache Helix Cluster的资源。当我们通过测试章节提到的流程,启动一个controller和一个worker时,可以在zookeeper中执行查询ls -R /testMirrorMaker,查看Apache Helix的信息:
/testMirrorMaker/CONFIGS
/testMirrorMaker/CONFIGS/CLUSTER
/testMirrorMaker/CONFIGS/CLUSTER/testMirrorMaker
/testMirrorMaker/CONFIGS/PARTICIPANT
/testMirrorMaker/CONFIGS/PARTICIPANT/testHelixMirrorMaker01
/testMirrorMaker/CONFIGS/RESOURCE
/testMirrorMaker/CONFIGS/RESOURCE/test-mirror
/testMirrorMaker/INSTANCES
/testMirrorMaker/INSTANCES/testHelixMirrorMaker01
/testMirrorMaker/INSTANCES/testHelixMirrorMaker01/CURRENTSTATES/100d394bcd8000d/test-mirror
/testMirrorMaker/LIVEINSTANCES
/testMirrorMaker/LIVEINSTANCES/testHelixMirrorMaker01
...
当uReplicator worker出现故障时,会通过AutoRebalanceLiveInstanceChangeListener将该worker处理分区交给其他worker,分配策略是优先找空闲的、负载小的worker。
疑问:
所有worker的消费组id是一样的,不然再均衡之后不能从上次消费的offset位置继续消费。但如果所有worker的消费组id一致,不是就会触发kafka的消费者再平衡吗?可能是哪里禁用了这种平衡机制。
kafka.mirrormaker.KafkaConnector消费数据时,并没有使用org.apache.kafka.clients.consumer.KafkaConsumer,而是使用kafka.consumer.KafkaStream#KafkaStream,这个类并不需要指定消费组id。
我的思考
uReplicator如何实现Kafka集群之间的数据同步
uReplicator包括controller和worker节点,controller节点负责将kafka的主题和分区分配给worker节点,worker节点负责将指定的主题和分区数据同步到目标kafka集群中。
uReplicator通过Apache Helix实现容错,Apache Helix依赖zookeeper服务。一个worker节点挂掉后,它负责的主题和分区会交给其他worker节点处理,分配的策略是优先选择空闲的、负载低的节点,这种策略会尽力避免类似kafka消费者组重平衡的全局影响。
uReplicator同步数据的核心逻辑和MirrorMaker基本一致,不同的地方在于uReplicator采用kafka.consumer.KafkaStream#KafkaStream消费数据,MirrorMaker采用org.apache.kafka.clients.consumer.KafkaConsumer消费数据。这也是uReplicator不会受消费者组影响的原因。
MirrorMaker和uReplicator的适用范围
如果数据同步节点较少,比如小于5个,更适合使用MirrorMaker,而不是uReplicator。
uReplicator提供了便利的集群管理功能,但是需要管理zookeeper服务、controller节点和worker节点,复杂度更高。只有当集群规模足够大,管理成本足够高,或者消费者组重平衡代价足够大,才需要使用uReplicator。
至于uReplicator提供的容错功能,MirrorMaker也有,就是消费者组重平衡,这个方案更简洁。
