Kafka MirrorMaker

测试

环境

源Kafka集群:版本:kafka_2.11-2.0.0,地址:cluster2。

目标Kafka集群:版本:kafka_2.10-0.10.1,地址:cluster3。

脚本

目标Kafka集群新建消费者配置 $KAFKA_HOME/test/mirror-consumer.properties:

  1. bootstrap.servers=cluster2:9092
  2. group.id=test-mirror
  3. auto.offset.reset=earliest
  4. partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor

目标Kafka集群新建生产者配置 $KAFKA_HOME/test/mirror-producer.properties:

bootstrap.servers=cluster3:9092

源Kafka集群和目标Kafka集群新增脚本 $KAFKA_HOME/test/print-offset.sh,检查指定主题的各个分区的起始位移和结束位移:

echo "topic: $1"
echo "offset start:"
$KAFKA_HOME/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic $1 --time -2
echo "offset end:"
$KAFKA_HOME/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic $1 --time -1

数据

源Kafka集群创建主题和分区:

$KAFKA_HOME/bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic test-mirror --partitions 32

目标Kafka集群创建主题和分区:

# 0.10.1版本通过--zookeeper指定集群位置
$KAFKA_HOME/bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic test-mirror --partitions 32 --replication-factor 1

源Kafka集群准备测试数据:

public class TestMirrorProducer {

    /**
     *
     * @param args 格式:servers topic record_num 
     */
    public static void main(String[] args) {
        String servers = "cluster2:9092"; //args[0];
        String topic = "test-mirror"; //args[1];
        int recordNum = 1000000; //Integer.valueOf(args[2]);

        long time = System.nanoTime();
        Producer<String, Long> producer = createProducer(servers);
        List<PartitionInfo> partitionInfos = producer.partitionsFor(topic);
        int partitionSize = partitionInfos.size();
        for (int i = 0; i < recordNum; i++) {
            ProducerRecord<String, Long> record = new ProducerRecord<>(topic, i % partitionSize, null, (long) i);
            producer.send(record);
        }
        producer.flush();
        producer.close();
        long spend = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - time);
        System.out.println(String.format("主题:%s, 数据:%s, 耗时:%sms", topic, recordNum, spend));
    }

    private static Producer<String, Long> createProducer(String bootstrapServers) {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.CLIENT_ID_CONFIG, "mirror-maker-producer");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName());
        return new KafkaProducer<>(props);
    }
}

同步数据

目标kafka集群启动MirrorMaker脚本:

# 在$KAFKA_HOME根目录执行
bin/kafka-mirror-maker.sh --consumer.config test/mirror-consumer.properties --producer.config test/mirror-producer.properties --num.streams 5 --whitelist test-mirror

结果对比

源Kafka集群执行print-offset.sh,查看主题test-mirror下各分区的数据:

topic: test-mirror
offset start:
test-mirror:0:0
test-mirror:1:0
...
test-mirror:31:0
offset end:
test-mirror:0:31250
test-mirror:1:31250
...
test-mirror:31:31250

目标Kafka集群执行print-offset.sh,查看主题test-mirror下各分区的数据:

topic: test-mirror
offset start:
test-mirror:0:0
test-mirror:1:0
...
test-mirror:31:0
offset end:
test-mirror:0:31250
test-mirror:1:31250
...
test-mirror:31:31250

总结

  1. 反复测试的时候,需要修改test/mirror-consumer.properties的参数group.id,清理消费者位移;重建目标Kafka集群的主题。
  2. 需要在启动脚本中通过whitelist指定需要同步的主题,维护不方便。

生产配置

启动脚本mirror-maker.sh

每一个参数的含义可以阅读kafka-mirror-maker.sh和kafka-run-class.sh两个脚本。

KAFKA_HOME=/opt/kafka2
MIRROR_MAKER_DIR=$KAFKA_HOME/mirror-maker

export DAEMON_MODE=true
export LOG_DIR=$MIRROR_MAKER_DIR/logs
export CONSOLE_OUTPUT_FILE=$LOG_DIR/console.out
export KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:$MIRROR_MAKER_DIR/log4j.properties"
export JMX_PORT=25894
export KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote.rmi.port=$JMX_PORT -XX:+DisableAttachMechanism -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false  -Dcom.sun.management.jmxremote.ssl=false "

$KAFKA_HOME/bin/kafka-mirror-maker.sh --consumer.config $MIRROR_MAKER_DIR/consumer.properties --producer.config $MIRROR_MAKER_DIR/producer.properties --num.streams 5 --whitelist $1

日志配置文件log4j.properties

log4j.rootLogger=INFO, RollingAppender
log4j.appender.RollingAppender=org.apache.log4j.DailyRollingFileAppender
log4j.appender.RollingAppender.File=${kafka.logs.dir}/mirrormaker.log
log4j.appender.RollingAppender.DatePattern='.'yyyy-MM-dd
log4j.appender.RollingAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.RollingAppender.layout.ConversionPattern=[%p] %d %c %M - %m%n

consumer.properties和producer.properties同测试阶段配置。

参考:

JMX只使用一个端口

MM避免消息丢失

源码分析

消费者配置

说明:

  1. 不自动提交消费位移
  2. 不解析消息
// Disable consumer auto offsets commit to prevent data loss.
maybeSetDefaultProperty(consumerConfigProps, "enable.auto.commit", "false")
// Hardcode the deserializer to ByteArrayDeserializer
consumerConfigProps.setProperty("key.deserializer", classOf[ByteArrayDeserializer].getName)
consumerConfigProps.setProperty("value.deserializer", classOf[ByteArrayDeserializer].getName)
// The default client id is group id, we manually set client id to groupId-index to avoid metric collision
val groupIdString = consumerConfigProps.getProperty("group.id")
val consumers = (0 until numStreams) map { i =>
  consumerConfigProps.setProperty("client.id", groupIdString + "-" + i.toString)
  new KafkaConsumer[Array[Byte], Array[Byte]](consumerConfigProps)
}

生产者配置

说明:

  1. 异步发送消息
  2. 避免消息丢失
  3. 不解析消息
val producerProps = Utils.loadProps(options.valueOf(producerConfigOpt))
val sync = producerProps.getProperty("producer.type", "async").equals("sync")
producerProps.remove("producer.type")
// Defaults to no data loss settings.
maybeSetDefaultProperty(producerProps, ProducerConfig.RETRIES_CONFIG, Int.MaxValue.toString)
maybeSetDefaultProperty(producerProps, ProducerConfig.MAX_BLOCK_MS_CONFIG, Long.MaxValue.toString)
maybeSetDefaultProperty(producerProps, ProducerConfig.ACKS_CONFIG, "all")
maybeSetDefaultProperty(producerProps, ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1")
// Always set producer key and value serializer to ByteArraySerializer.
producerProps.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer")
producerProps.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer")
producer = new MirrorMakerProducer(sync, producerProps)

消费消息

说明:

  1. 启动参数—num.streams配置了consumer数量,MirrorMakerThread数量等于consumer数量,所有consumer共享一个producer。
  2. producer默认异步发送消息,发送失败默认会关闭producer。
  3. 消费者位移默认1分钟提交一次。
  4. 如果producer最后没有把消息发出去,消息就丢失了,丢失的消息会记录在numDroppedMessages,可以通过jmx查询。
class MirrorMakerThread {
    override def run() {
        while (!exitingOnSendFailure && !shuttingDown) {
            val data = consumerWrapper.receive()
            val records = messageHandler.handle(toBaseConsumerRecord(data))
            records.asScala.foreach(producer.send)
            maybeFlushAndCommitOffsets()
        }
        producer.flush()
        commitOffsets(consumerWrapper)
    }

    def maybeFlushAndCommitOffsets() {
        if (System.currentTimeMillis() - lastOffsetCommitMs > offsetCommitIntervalMs) {
            debug("Committing MirrorMaker state.")
            producer.flush()
            commitOffsets(consumerWrapper)
            lastOffsetCommitMs = System.currentTimeMillis()
        }
    }
}

class MirrorMakerProducer {
    def send() {
        if (sync) {
            this.producer.send(record).get()
        } else {
            this.producer.send(record,
                               new MirrorMakerProducerCallback(record.topic(), record.key(), record.value()))
        }
    }
}

class MirrorMakerProducerCallback {
    override def onCompletion() {
        if (exception != null) {
            // Use default call back to log error. This means the max retries of producer has reached and message
            // still could not be sent.
            super.onCompletion(metadata, exception)
            // If abort.on.send.failure is set, stop the mirror maker. Otherwise log skipped message and move on.
            if (abortOnSendFailure) {
                info("Closing producer due to send failure.")
                exitingOnSendFailure = true
                producer.close(0)
            }
            numDroppedMessages.incrementAndGet()
        }
    }
}

停止应用

说明:

  1. 通过kill -TERM pid停止应用。
  2. 正常情况下,producer.close()会把所有消息写入目标kafka之后再停止。
def main() {
    Runtime.getRuntime.addShutdownHook(new Thread("MirrorMakerShutdownHook") {
        override def run() {
            cleanShutdown()
        }
    })
}

def cleanShutdown() {
    if (isShuttingDown.compareAndSet(false, true)) {
        info("Start clean shutdown.")
        // Shutdown consumer threads.
        info("Shutting down consumer threads.")
        if (mirrorMakerThreads != null) {
            mirrorMakerThreads.foreach(_.shutdown())
            mirrorMakerThreads.foreach(_.awaitShutdown())
        }
        info("Closing producer.")
        producer.close()
        info("Kafka mirror maker shutdown successfully")
    }
}

监控

监控指标

  1. 消费Lag,以主题为单位。
  2. 消费端读取字节数/秒,以主题为单位。
  3. 消费端读取消息数/秒,以主题为单位。
  4. 生产端写入字节数/秒,以主题为单位。
  5. 生产端写入消息数/秒,以主题为单位。
  6. 丢弃消息总数:kafka.tools.MirrorMaker.MirrorMaker-numDroppedMessages。

uReplicator

Kafka MirrorMaker vs uReplicator

如果只需要一个节点,使用MirrorMaker即可。如果需要多个节点,uReplicator更方便,它提供了集群管理和运维功能。

Kafka MirrorMaker不足:

  1. 添加主题困难。MirrorMaker需要在启动时指定管理的主题,修改主题需要重启。
  2. 昂贵的消费者重平衡。使用MirrorMaker集群时,单次平衡可能达到5-10分钟。
  3. 元数据同步问题。使用MirrorMaker集群时,MirrorMaker节点的主题配置不一致,导致集群崩溃。

uReplicator优势

  1. 通过REST API动态添加删除主题。
  2. 重新实现了主题和分区的消费方案,减少全局重平衡带来的影响。
  3. 通过Apache Helix管理uReplicator集群,实现了故障自动迁移。

参考:

uReplicator: Uber Engineering’s Robust Apache Kafka Replicator

CSDN-MirrorMaker和uReplicator快速开始

uReplicator User Guide

Apache Helix Quick Start

测试

构建

源码:https://github.com/uber/uReplicator

版本:branch-1.0

构建命令:mvn clean package -DskipTests

构建结果:

  • $UREPLICATOR_CODE_HOME/uReplicator-Distribution/target/uReplicator-Distribution-pkg/bin:示例脚本
  • $UREPLICATOR_CODE_HOME/uReplicator-Distribution/target/uReplicator-Distribution-pkg/repo:构建后的jar包
  • $UREPLICATOR_CODE_HOME/config:示例配置

目录组织:

将目录bin, repo, config平行放置到服务器uReplicator根目录下。

uReplicator
|--bin
|--repo
|--config

参考:

uReplicator User Guide

uReplicator branch-1.0 README.md文件

配置说明

uReplicator controller启动脚本

脚本位置:bin/start-controller-example1.sh。

# 最后一句
exec "$JAVACMD" $JAVA_OPTS -Dapp_name=uReplicator-Controller \
  -classpath "$CLASSPATH" \
  -Dapp.name="start-controller-example1" \
  -Dapp.pid="$$" \
  -Dapp.repo="$REPO" \
  -Dapp.home="$BASEDIR" \
  -Dbasedir="$BASEDIR" \
  com.uber.stream.kafka.mirrormaker.starter.MirrorMakerStarter \
  startMirrorMakerController -example1 "$@"

其中-example1代表的参数在代码中写死:

public class ControllerStarter {
    public static ControllerStarter init(CommandLine cmd) {
        ControllerConf conf = null;
        if (cmd.hasOption("example1")) {
            conf = getDefaultConf();
        } else if (cmd.hasOption("example2")) {
            conf = getExampleConf();
        } else {
            try {
                conf = ControllerConf.getControllerConf(cmd);
            } catch (Exception e) {
                throw new RuntimeException("Not valid controller configurations!", e);
            }
        }
        final ControllerStarter starter = new ControllerStarter(conf);
        return starter;
    }

    public static ControllerConf getDefaultConf() {
        final ControllerConf conf = new ControllerConf();
        conf.setControllerPort("9000");
        conf.setZkStr("localhost:2181");
        conf.setHelixClusterName("testMirrorMaker");
        conf.setDeploymentName("testDeploymentName");
        conf.setBackUpToGit("false");
        conf.setAutoRebalanceDelayInSeconds("120");
        conf.setLocalBackupFilePath("/var/log/kafka-mirror-maker-controller");
        return conf;
    }
}

可以通过具体的参数来代替-example1,方便修改参数值。

-port 9000 -zookeeper localhost:2181 -helixClusterName testMirrorMaker -deploymentName testDeploymentName -backUpToGit false -autoRebalanceDelayInSeconds 120 -localBackupFilePath /var/log/kafka-mirror-maker-controller

脚本bin/start-controller-example2.sh和bin/start-controller-example1.sh的区别在于,启动参数为-example2。只要源和目标kafka存在相同的主题,就会自动同步数据,不需要在whitelist中指定。

uReplicator worker启动脚本

脚本位置:bin/start-worker-example1.sh。

# 最后一句
exec "$JAVACMD" $JAVA_OPTS -Dapp_name=uReplicator-Worker \
  -classpath "$CLASSPATH" \
  -Dapp.name="start-worker-example1" \
  -Dapp.pid="$$" \
  -Dapp.repo="$REPO" \
  -Dapp.home="$BASEDIR" \
  -Dbasedir="$BASEDIR" \
  com.uber.stream.kafka.mirrormaker.starter.MirrorMakerStarter \
  startMirrorMakerWorker kafka.mirrormaker.MirrorMakerWorker \
                  --consumer.config config/consumer.properties \
                  --producer.config config/producer.properties \
                  --helix.config config/helix.properties \
                  --topic.mappings config/topicmapping.properties "$@"

config/consumer.properties指定了源kafka消费者配置。

# 源kafka的zookeeper根目录。如果没有指定cluster_name,就配置成src_kafka_zookeeper_host:2181。
zookeeper.connect=src_kafka_zookeeper_host:2181/cluster_name
...

config/producer.properties指定了目标kafka生产者配置。

# 目标kafka的broker地址
bootstrap.servers=target_kafka:9092
...

config/helix.properties指定了Apache Helix服务配置。

# Apache Helix服务依赖的zookeeper地址
zkServer=localhost:2181
# uReplicator worker作为Apache Helix Node的id
instanceId=testHelixMirrorMaker01
# Apache Helix Cluster id
helixClusterName=testMirrorMaker

config/topicmapping.properties指定了源kafka主题名称和目标kafka主题名称的映射关系,如果名称保持一致,则无需指定这个配置。

启动

在controller和worker的启动脚本中加入JAVA_OPTS="-Dlog4j.configuration=file:$BASEDIR/config/log4j.properties",可以为服务指定log4j配置,让日志输出到文件。两个服务最好采用不同的配置。

启动uReplicator controller:

nohup bin/start-controller-example1.sh 2>&1 >controller.out &

启动uReplicator worker:

nohup bin/start-worker-example1.sh 2>&1 >work.out &

同步数据

在源kafka所在服务器执行,源kafka创建主题:

# 源kafka版本为kafka_2.11-2.0.0
$KAFKA_HOME/bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic test-mirror --partitions 5 --replication-factor 1

在目标kafka所在服务器执行,目标kafka创建主题:

# 目标kafka版本为kafka_2.10-0.10.1。主题名称、分区数和源kafka保持一致。
$KAFKA_HOME/bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic test-mirror --partitions 5 --replication-factor 1

在源kafka所在服务器执行,源kafka创建测试数据:

$KAFKA_HOME/bin/kafka-producer-perf-test.sh --topic test-mirror --num-records 100000000 --record-size=100 --throughput -1 --producer-props bootstrap.servers=localhost:9092

在uReplicator controller所在服务器执行,uReplicator添加管理主题:

curl -X POST -d '{"topic":"test-mirror", "numPartitions":"5"}' http://localhost:9000/topics

在目标kafka所在服务器执行,观察目标kafka主题同步情况:

 # print-offset.sh为MirrorMaker那一节介绍的脚本
 $KAFKA_HOME/test/print-offset.sh test-mirror

源码分析

REST服务

uReplicator controller启动时,创建ControllerRestApplication服务,绑定到-port参数指定的端口。

public class ControllerInstance {
    public ControllerInstance() {
        _component = new Component();
        _controllerRestApp = new ControllerRestApplication(null);
    }

    public void start() throws Exception {
        _component.getServers().add(Protocol.HTTP, Integer.parseInt(_config.getControllerPort()));
        final Context applicationContext = _component.getContext().createChildContext();
        _controllerRestApp.setContext(applicationContext);
        _component.getDefaultHost().attach(_controllerRestApp);
    }
}

ControllerRestApplication提供的服务:

public class ControllerRestApplication extends Application {

  @Override
  public Restlet createInboundRoot() {
    final Router router = new Router(getContext());
    router.setDefaultMatchingMode(Template.MODE_EQUALS);

    // Topic Servlet
    router.attach("/topics", TopicManagementRestletResource.class);
    router.attach("/topics/{topicName}", TopicManagementRestletResource.class);

    // Health Check Servlet
    router.attach("/health", HealthCheckRestletResource.class);

    // Validation Servlet
    router.attach("/validation", ValidationRestletResource.class);
    router.attach("/validation/{option}", ValidationRestletResource.class);

    // Admin Servlet
    router.attach("/admin", AdminRestletResource.class);
    router.attach("/admin/{opt}", AdminRestletResource.class);

    // MirrorMakerManager Servlet
    router.attach("/instances", MirrorMakerManagerRestletResource.class);
    router.attach("/instances/{instanceName}", MirrorMakerManagerRestletResource.class);

    router.attach("/offset", TopicParitionOffsetRestletResource.class);
    router.attach("/offset/{topic}/{partition}", TopicParitionOffsetRestletResource.class);
    router.attach("/noprogress", NoProgressTopicPartitionRestletResource.class);

    return router;
  }
}

其中/instances可以查看uReplicator集群的节点状态。

/topics可以添加删除主题,查询主题的明细,每一个分区由哪一个uReplicator worker消费。

/offset可以查看消费延迟情况,通过读取zookeeper的/consumer/$consumer_group/offsets/$topic/$partition节点信息实现。我本地测试时/consumer下没有数据,所以请求/offset并没有返回信息。

消费消息

uReplicator worker消费消息的逻辑和MirrorMaker基本一致,具体实现参考代码MirrorMakerThread.run。

class MirrorMakerThread(connector: KafkaConnector) {
    override def run() {
        val stream = connector.getStream()
        val iter = stream.iterator()

        while (!exitingOnSendFailure && !shuttingDown && iter.hasNext()) {
            val data = iter.next()
            val records = messageHandler.handle(data)
            val iterRecords = records.iterator()
            while (iterRecords.hasNext) {
                val record = iterRecords.next()
                if (!filterEnabled || needToSend(record, srcCluster, dstCluster, data.offset)) {
                    producer.send(record, data.partition, data.offset)
                }
            }
            maybeFlushAndCommitOffsets(false)
        }
        ...
    }

}

消费者再均衡

uReplicator通过Apache Helix实现故障转移,uReplicator worker实例就是Apache Helix Cluster的Node,uReplicator管理的主题就是Apache Helix Cluster的资源。当我们通过测试章节提到的流程,启动一个controller和一个worker时,可以在zookeeper中执行查询ls -R /testMirrorMaker,查看Apache Helix的信息:

/testMirrorMaker/CONFIGS
/testMirrorMaker/CONFIGS/CLUSTER
/testMirrorMaker/CONFIGS/CLUSTER/testMirrorMaker
/testMirrorMaker/CONFIGS/PARTICIPANT
/testMirrorMaker/CONFIGS/PARTICIPANT/testHelixMirrorMaker01
/testMirrorMaker/CONFIGS/RESOURCE
/testMirrorMaker/CONFIGS/RESOURCE/test-mirror

/testMirrorMaker/INSTANCES
/testMirrorMaker/INSTANCES/testHelixMirrorMaker01
/testMirrorMaker/INSTANCES/testHelixMirrorMaker01/CURRENTSTATES/100d394bcd8000d/test-mirror

/testMirrorMaker/LIVEINSTANCES
/testMirrorMaker/LIVEINSTANCES/testHelixMirrorMaker01
...

当uReplicator worker出现故障时,会通过AutoRebalanceLiveInstanceChangeListener将该worker处理分区交给其他worker,分配策略是优先找空闲的、负载小的worker。

疑问:

所有worker的消费组id是一样的,不然再均衡之后不能从上次消费的offset位置继续消费。但如果所有worker的消费组id一致,不是就会触发kafka的消费者再平衡吗?可能是哪里禁用了这种平衡机制。

kafka.mirrormaker.KafkaConnector消费数据时,并没有使用org.apache.kafka.clients.consumer.KafkaConsumer,而是使用kafka.consumer.KafkaStream#KafkaStream,这个类并不需要指定消费组id。

我的思考

uReplicator如何实现Kafka集群之间的数据同步

uReplicator包括controller和worker节点,controller节点负责将kafka的主题和分区分配给worker节点,worker节点负责将指定的主题和分区数据同步到目标kafka集群中。

uReplicator通过Apache Helix实现容错,Apache Helix依赖zookeeper服务。一个worker节点挂掉后,它负责的主题和分区会交给其他worker节点处理,分配的策略是优先选择空闲的、负载低的节点,这种策略会尽力避免类似kafka消费者组重平衡的全局影响。

uReplicator同步数据的核心逻辑和MirrorMaker基本一致,不同的地方在于uReplicator采用kafka.consumer.KafkaStream#KafkaStream消费数据,MirrorMaker采用org.apache.kafka.clients.consumer.KafkaConsumer消费数据。这也是uReplicator不会受消费者组影响的原因。

MirrorMaker和uReplicator的适用范围

如果数据同步节点较少,比如小于5个,更适合使用MirrorMaker,而不是uReplicator。

uReplicator提供了便利的集群管理功能,但是需要管理zookeeper服务、controller节点和worker节点,复杂度更高。只有当集群规模足够大,管理成本足够高,或者消费者组重平衡代价足够大,才需要使用uReplicator。

至于uReplicator提供的容错功能,MirrorMaker也有,就是消费者组重平衡,这个方案更简洁。