在用kafka集群有3个节点,即host1, host2, host3,现需要将broker节点扩容至6个,以提供更高的数据处理能力。
    一、上架物理服务器用于提供更多的资源
    新扩容3个broker节点,host4,host5,host6

    二、在三台新增节点上部署kafka应用程序
    这些不是本文的重点,略过。

    三、重新分布原有的topic分区

    1、查看集群中当前所有可用的topic
    ./kafka-topics.sh —list —zookeeper ip:port

    1. lcf-201612201649
    2. test-for-sys-monitor

    2、查看特定topic的详细信息

    1. 1. ./kafka-topics.sh --describe --zookeeper 192.168.1.92:2181 --topic lcf-201612201649
    2. 2. Topic:lcf-201612201649 PartitionCount:24 ReplicationFactor:3 Configs:
    3. 3. Topic: lcf-201612201649 Partition: 0 Leader: 1 Replicas: 1,3,2 Isr: 1,3,2
    4. 4. Topic: lcf-201612201649 Partition: 1 Leader: 2 Replicas: 2,1,3 Isr: 3,1,2
    5. 5. Topic: lcf-201612201649 Partition: 2 Leader: 3 Replicas: 3,2,1 Isr: 3,1,2
    6. 6. Topic: lcf-201612201649 Partition: 3 Leader: 1 Replicas: 1,2,3 Isr: 1,3,2
    7. 7. Topic: lcf-201612201649 Partition: 4 Leader: 2 Replicas: 2,3,1 Isr: 3,1,2
    8. 8. Topic: lcf-201612201649 Partition: 5 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2
    9. 9. Topic: lcf-201612201649 Partition: 6 Leader: 1 Replicas: 1,3,2 Isr: 1,3,2
    10. 10. Topic: lcf-201612201649 Partition: 7 Leader: 2 Replicas: 2,1,3 Isr: 1,3,2
    11. 11. Topic: lcf-201612201649 Partition: 8 Leader: 3 Replicas: 3,2,1 Isr: 3,1,2
    12. 12. Topic: lcf-201612201649 Partition: 9 Leader: 1 Replicas: 1,2,3 Isr: 1,3,2
    13. 13. Topic: lcf-201612201649 Partition: 10 Leader: 2 Replicas: 2,3,1 Isr: 3,1,2
    14. 14. Topic: lcf-201612201649 Partition: 11 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2
    15. 15. Topic: lcf-201612201649 Partition: 12 Leader: 1 Replicas: 1,3,2 Isr: 1,3,2
    16. 16. Topic: lcf-201612201649 Partition: 13 Leader: 2 Replicas: 2,1,3 Isr: 3,1,2
    17. 17. Topic: lcf-201612201649 Partition: 14 Leader: 3 Replicas: 3,2,1 Isr: 3,1,2
    18. 18. Topic: lcf-201612201649 Partition: 15 Leader: 1 Replicas: 1,2,3 Isr: 1,3,2
    19. 19. Topic: lcf-201612201649 Partition: 16 Leader: 2 Replicas: 2,3,1 Isr: 3,1,2
    20. 20. Topic: lcf-201612201649 Partition: 17 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2
    21. 21. Topic: lcf-201612201649 Partition: 18 Leader: 1 Replicas: 1,3,2 Isr: 1,3,2
    22. 22. Topic: lcf-201612201649 Partition: 19 Leader: 2 Replicas: 2,1,3 Isr: 1,3,2
    23. 23. Topic: lcf-201612201649 Partition: 20 Leader: 3 Replicas: 3,2,1 Isr: 3,1,2
    24. 24. Topic: lcf-201612201649 Partition: 21 Leader: 1 Replicas: 1,2,3 Isr: 1,3,2
    25. 25. Topic: lcf-201612201649 Partition: 22 Leader: 2 Replicas: 2,3,1 Isr: 3,1,2
    26. 26. Topic: lcf-201612201649 Partition: 23 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2

    3、kafka集群分区日志迁移
    我们的目标是把原来分布在3个节点上的topic(24partitions, 3replicas),将全部partitions重新分布到全部的6个节点上去。

    (1) 先制作topics-to-move.json文件

    1. cat << EOF > topic-to-move.json
    2. {"topics": [{"topic": "lcf-201612201649"}],
    3. "version":1
    4. }
    5. EOF
    

    (2)使用-generate生成迁移计划

    1. [testuser@c4 bin]$ ./bin/kafka-reassign-partitions.sh --zookeeper 192.168.1.92:2181 --topics-to-move-json-file  ./plans/topic-to-move.json  --broker-list "1,2,3,4,5,6" --generate
    2. Current partition replica assignment
    3. {"version":1,"partitions":[{"topic":"lcf-201612201649","partition":1,"replicas":[2,1,3]},{"topic":"lcf-201612201649","partition":8,"replicas":[3,2,1]},{"topic":"lcf-201612201649","partition":19,"replicas":[2,1,3]},{"topic":"lcf-201612201649","partition":15,"replicas":[1,2,3]},{"topic":"lcf-201612201649","partition":18,"replicas":[1,3,2]},{"topic":"lcf-201612201649","partition":13,"replicas":[2,1,3]},{"topic":"lcf-201612201649","partition":0,"replicas":[1,3,2]},{"topic":"lcf-201612201649","partition":10,"replicas":[2,3,1]},{"topic":"lcf-201612201649","partition":5,"replicas":[3,1,2]},{"topic":"lcf-201612201649","partition":12,"replicas":[1,3,2]},{"topic":"lcf-201612201649","partition":17,"replicas":[3,1,2]},{"topic":"lcf-201612201649","partition":9,"replicas":[1,2,3]},{"topic":"lcf-201612201649","partition":7,"replicas":[2,1,3]},{"topic":"lcf-201612201649","partition":20,"replicas":[3,2,1]},{"topic":"lcf-201612201649","partition":23,"replicas":[3,1,2]},{"topic":"lcf-201612201649","partition":3,"replicas":[1,2,3]},{"topic":"lcf-201612201649","partition":2,"replicas":[3,2,1]},{"topic":"lcf-201612201649","partition":4,"replicas":[2,3,1]},{"topic":"lcf-201612201649","partition":11,"replicas":[3,1,2]},{"topic":"lcf-201612201649","partition":6,"replicas":[1,3,2]},{"topic":"lcf-201612201649","partition":14,"replicas":[3,2,1]},{"topic":"lcf-201612201649","partition":22,"replicas":[2,3,1]},{"topic":"lcf-201612201649","partition":16,"replicas":[2,3,1]},{"topic":"lcf-201612201649","partition":21,"replicas":[1,2,3]}]}
    4. Proposed partition reassignment configuration
    5. {"version":1,"partitions":[{"topic":"lcf-201612201649","partition":1,"replicas":[4,1,2]},{"topic":"lcf-201612201649","partition":15,"replicas":[6,5,1]},{"topic":"lcf-201612201649","partition":8,"replicas":[5,3,4]},{"topic":"lcf-201612201649","partition":19,"replicas":[4,5,6]},{"topic":"lcf-201612201649","partition":13,"replicas":[4,3,5]},{"topic":"lcf-201612201649","partition":18,"replicas":[3,4,5]},{"topic":"lcf-201612201649","partition":0,"replicas":[3,6,1]},{"topic":"lcf-201612201649","partition":10,"replicas":[1,5,6]},{"topic":"lcf-201612201649","partition":5,"replicas":[2,5,6]},{"topic":"lcf-201612201649","partition":12,"replicas":[3,2,4]},{"topic":"lcf-201612201649","partition":9,"replicas":[6,4,5]},{"topic":"lcf-201612201649","partition":17,"replicas":[2,1,3]},{"topic":"lcf-201612201649","partition":7,"replicas":[4,2,3]},{"topic":"lcf-201612201649","partition":20,"replicas":[5,6,1]},{"topic":"lcf-201612201649","partition":23,"replicas":[2,3,4]},{"topic":"lcf-201612201649","partition":3,"replicas":[6,3,4]},{"topic":"lcf-201612201649","partition":4,"replicas":[1,4,5]},{"topic":"lcf-201612201649","partition":2,"replicas":[5,2,3]},{"topic":"lcf-201612201649","partition":11,"replicas":[2,6,1]},{"topic":"lcf-201612201649","partition":6,"replicas":[3,1,2]},{"topic":"lcf-201612201649","partition":22,"replicas":[1,2,3]},{"topic":"lcf-201612201649","partition":14,"replicas":[5,4,6]},{"topic":"lcf-201612201649","partition":16,"replicas":[1,6,2]},{"topic":"lcf-201612201649","partition":21,"replicas":[6,1,2]}]}
    6. [testuser@c4 bin]$
    

    注:生成一个向broker 1,2,3,4,5,6迁移指定topic的迁移计划。输出内容中包括当前的分布配置和即将改变后的分布配置。
    将以上命令的输出内容保存为json文件。其中当前分布配置备份为backup.json,改变后的分布配置保存为expand-cluster-reassignment.json

    (3)使用-execute执行迁移计划

    1. [testuser@c4 kafka]$ ./bin/kafka-reassign-partitions.sh --zookeeper 192.168.1.92:2181 --reassignment-json-file ./plans/reassignment-lcf-201612201649.json --execute
    2. Current partition replica assignment
    3. {"version":1,"partitions":[{"topic":"lcf-201612201649","partition":1,"replicas":[2,1,3]},{"topic":"lcf-201612201649","partition":8,"replicas":[3,2,1]},{"topic":"lcf-201612201649","partition":19,"replicas":[2,1,3]},{"topic":"lcf-201612201649","partition":15,"replicas":[1,2,3]},{"topic":"lcf-201612201649","partition":18,"replicas":[1,3,2]},{"topic":"lcf-201612201649","partition":13,"replicas":[2,1,3]},{"topic":"lcf-201612201649","partition":0,"replicas":[1,3,2]},{"topic":"lcf-201612201649","partition":10,"replicas":[2,3,1]},{"topic":"lcf-201612201649","partition":5,"replicas":[3,1,2]},{"topic":"lcf-201612201649","partition":12,"replicas":[1,3,2]},{"topic":"lcf-201612201649","partition":17,"replicas":[3,1,2]},{"topic":"lcf-201612201649","partition":9,"replicas":[1,2,3]},{"topic":"lcf-201612201649","partition":7,"replicas":[2,1,3]},{"topic":"lcf-201612201649","partition":20,"replicas":[3,2,1]},{"topic":"lcf-201612201649","partition":23,"replicas":[3,1,2]},{"topic":"lcf-201612201649","partition":3,"replicas":[1,2,3]},{"topic":"lcf-201612201649","partition":2,"replicas":[3,2,1]},{"topic":"lcf-201612201649","partition":4,"replicas":[2,3,1]},{"topic":"lcf-201612201649","partition":11,"replicas":[3,1,2]},{"topic":"lcf-201612201649","partition":6,"replicas":[1,3,2]},{"topic":"lcf-201612201649","partition":14,"replicas":[3,2,1]},{"topic":"lcf-201612201649","partition":22,"replicas":[2,3,1]},{"topic":"lcf-201612201649","partition":16,"replicas":[2,3,1]},{"topic":"lcf-201612201649","partition":21,"replicas":[1,2,3]}]}
    4. 
    5. Save this to use as the --reassignment-json-file option during rollback
    6. Successfully started reassignment of partitions {"version":1,"partitions":[{"topic":"lcf-201612201649","partition":12,"replicas":[3,2,4]},{"topic":"lcf-201612201649","partition":17,"replicas":[2,1,3]},{"topic":"lcf-201612201649","partition":8,"replicas":[5,3,4]},{"topic":"lcf-201612201649","partition":7,"replicas":[4,2,3]},{"topic":"lcf-201612201649","partition":23,"replicas":[2,3,4]},{"topic":"lcf-201612201649","partition":4,"replicas":[1,4,5]},{"topic":"lcf-201612201649","partition":19,"replicas":[4,5,6]},{"topic":"lcf-201612201649","partition":2,"replicas":[5,2,3]},{"topic":"lcf-201612201649","partition":20,"replicas":[5,6,1]},{"topic":"lcf-201612201649","partition":11,"replicas":[2,6,1]},{"topic":"lcf-201612201649","partition":16,"replicas":[1,6,2]},{"topic":"lcf-201612201649","partition":5,"replicas":[2,5,6]},{"topic":"lcf-201612201649","partition":14,"replicas":[5,4,6]},{"topic":"lcf-201612201649","partition":10,"replicas":[1,5,6]},{"topic":"lcf-201612201649","partition":18,"replicas":[3,4,5]},{"topic":"lcf-201612201649","partition":22,"replicas":[1,2,3]},{"topic":"lcf-201612201649","partition":15,"replicas":[6,5,1]},{"topic":"lcf-201612201649","partition":1,"replicas":[4,1,2]},{"topic":"lcf-201612201649","partition":6,"replicas":[3,1,2]},{"topic":"lcf-201612201649","partition":21,"replicas":[6,1,2]},{"topic":"lcf-201612201649","partition":13,"replicas":[4,3,5]},{"topic":"lcf-201612201649","partition":0,"replicas":[3,6,1]},{"topic":"lcf-201612201649","partition":9,"replicas":[6,4,5]},{"topic":"lcf-201612201649","partition":3,"replicas":[6,3,4]}]}
    

    (4)使用—verify进行迁移结果的验证

    [testuser@c4 kafka]$ ./bin/kafka-reassign-partitions.sh --zookeeper 192.168.1.92:2181 --reassignment-json-file ./plans/reassignment-lcf-201612201649.json --verify
    

    查看重分布结果:

    ./bin/kafka-topics.sh --describe --zookeeper 192.168.1.92:2181 --topic lcf-201612201649
    

    当以上迁移过程导致kafka的leader分布,不符preferred replica分布建议,则可以手动进行再平衡维护。
    注:进行分区迁移时,最好先保留一个分区在原来的磁盘,这样不会影响正常的消费和生产。部分迁移则支持正常消费和生产。

    (5)关于kafka集群leader平衡机制的维护

    ./kafka-preferred-replica-election.sh --zookeeper ip:port
    

    或者在配置文件中将参数设置为:

    auto.leader.rebalance.enable=true
    

    缩容前的准备
    1.编辑之前创建的topics-to-move.json文件,添加上系统自动生成的consumer_offsets
    [root@cdh01 ~]# vim topics-to-move.json
    {“topics”: [{“topic”: “test”},{“topic”: “
    consumer_offsets”},
    {“topic”: “test1”}
    ],
    “version”:1
    }
    2.再次使用命令生成迁移计划,这里只选取121,124,126这三个broker,然后把生成计划中的126替换成125进行保存,这样就把126上的数据全部迁移到了125上。
    kafka-reassign-partitions —zookeeper cdh01.hadoop.com:2181 —topics-to-move-json-file topics-to-move.json —broker-list “121,124,126” —generate

    3.执行迁移命令,进行迁移
    kafka-reassign-partitions —zookeeper cdh01.hadoop.com:2181 —reassignment-json-file newkafka.json —execute

    4.进行查询,迁移完成
    kafka-reassign-partitions —zookeeper cdh01.hadoop.com:2181 —reassignment-json-file newkafka.json —verify

    5.在要删除的broker上也可以看到,topic数据已经迁移走
    Kafka集群的缩容
    在完成上诉缩容前的准备后,现在可以进行kafka集群的缩容。
    6.下线topic

    总结
    1.Kafka集群的扩容和缩容可以通过CM来进行添加broker和删除broker来进行。
    2.在Kafka集群扩容后,已有topic的partition不会自动均衡到新的磁盘上。可以通过kafka-reassign-partitions命令来进行数据平衡,先用命令生成平衡方案,再执行。也可以手动编辑迁移方案来进行执行。
    3.新建topic的partition, 会以磁盘为单位,按照partition数量最少的来落盘。
    4.在Kafka缩容前,需要把要删除的broker上的topic数据迁出,也可以通过kafka-reassign-partitions来进行迁移,手动编辑迁移方案,再通过命令执行即可。
    5.kafka-reassign-partitions这个命令,只有指定了broker id上的topic才会参与partition的reassign。根据我们的需求,可以手动来编写和修改迁移计划