官网文档: http://kafka.apache.org/0110/documentation.html#monitoring

消息引擎系统:

生产者、消费者
批处理、流处理(kafka Streams)

创建一个topic

  1. kafka-topics \
  2. --create \
  3. --bootstrap-server kafka1:9092,kafka2:9092,kafka3:9092 \
  4. --replication-factor 3 \
  5. --partitions 3 \
  6. --topic test_topic

查看topic 列表

  1. kafka-topics --bootstrap-server kafka1:9092,kafka2:9092,kafka3:9092 --list

查看 topic 信息

  1. kafka-topics --bootstrap-server kafka1:9092,kafka2:9092,kafka3:9092 --topic ylyh_vn --describe

使用生产者

  1. kafka-console-producer \
  2. --broker-list kafka1:9092,kafka2:9092,kafka3:9092 \
  3. --topic test_topic

使用消费者 (另开终端)

  1. kafka-console-consumer \
  2. --bootstrap-server kafka1:9092,kafka2:9092,kafka3:9092 \
  3. --from-beginning \
  4. --topic test_topic

列出所有group

  1. kafka-consumer-groups --bootstrap-server kafka1:9092,kafka2:9092,kafka3:9092 --list

查看组信息

  1. kafka-consumer-groups --bootstrap-server kafka1:9092 --describe --group kafka2kudu_ylyh_cn
  2. # 其中依次展示group名称、消费的topic名称、partition id、consumer group最后一次提交的offset、最后提交的生产消息offset、消费offset与生产offset之间的差值、当前消费topic-partition的group成员id(不一定包含hostname)

增加kafka分区

  1. kafka-topics --bootstrap-server kafka1:9092,kafka2:9092,kafka3:9092 --topic ylyh_cn --alter --partitions 10

查看 topics 参数

  1. kafka-configs --zookeeper zk1:2181,zk2:2181,zk3:2181 --entity-type topics --entity-name ylyh_cn --describe

修改 topics 参数

  1. # 修改数据保留策略
  2. kafka-configs --entity-type topics --entity-name ylyh_cn --alter --add-config retention.ms=518400000 --zookeeper zk1:2181,zk2:2181,zk3:2181

常用命令小结

  1. # 启动
  2. bin/kafka-server-start.sh config/server.properties &
  3. # 停止
  4. bin/kafka-server-stop.sh config/server.properties
  5. # 创建主题
  6. bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test_topic
  7. # 查看主题
  8. bin/kafka-topics.sh --bootstrap-server localhost:9092 --list
  9. # 查看主题信息
  10. bin/kafka-topics.sh --bootstrap-server localhost:9092 --topic test_topic --describe
  11. # 生产者
  12. bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test_topic
  13. # 消费者
  14. bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test_topic
  1. # 列出消费组
  2. bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
  3. # 消费者指定消费组消费
  4. bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test_topic --consumer-property group.id=testgroup
  5. # 删除消费组
  6. kafka-consumer-groups.sh --bootstrap-server {kafka连接地址} --delete --group {消费组}
  1. # 查看 topics 参数
  2. kafka-configs --zookeeper zk1:2181,zk2:2181,zk3:2181 --entity-type topics --entity-name ylyh_cn --describe
  3. 修改 topics 参数
  4. kafka-configs --entity-type topics --entity-name ylyh_cn --alter --add-config retention.ms=518400000 --zookeeper zk1:2181,zk2:2181,zk3:2181

kafka 分区 offset 调整

  1. --to-earliest:把位移调整到分区当前最小位移
  2. --to-latest:把位移调整到分区当前最新位移
  3. --to-current:把位移调整到分区当前位移
  4. --to-offset <offset> 把位移调整到指定位移处
  5. --shift-by N 把位移调整到当前位移 + N处,注意N可以是负数,表示向前移动
  6. --to-datetime <datetime>:把位移调整到大于给定时间的最早位移处,datetime格式是yyyy-MM-ddTHH:mm:ss.xxx,比如2017-08-04T00:00:00.000
  7. --by-duration <duration>:把位移调整到距离当前时间指定间隔的位移处,duration格式是PnDTnHnMnS,比如PT0H5M0S
  8. --from-file <file>:从CSV文件中读取调整策略
  9. # --to-earliest 所有分区的位移重置为0,重新消费
  10. kafka-consumer-groups --bootstrap-server kafka1:9092,kafka2:9092,kafka3:9092 --reset-offsets --all-topics --to-earliest --execute --group kafka2kudu_ylyh_cn
  11. # --to-datetime 重置到某个时间点
  12. kafka-consumer-groups --bootstrap-server kafka1:9092,kafka2:9092,kafka3:9092 --reset-offsets --all-topics --to-datetime 2020-07-04T10:00:00.000 --execute --group kafka2kudu_ylyh_
  13. # --to-latest 所有分区的位移调整到最新,即LAG为0
  14. kafka-consumer-groups --bootstrap-server kafka1:9092,kafka2:9092,kafka3:9092 --reset-offsets --all-topics --to-latest --execute --group kafka2kudu_ylyh_cn
  15. kafka-consumer-groups --bootstrap-server kafka1:9092,kafka2:9092,kafka3:9092 --reset-offsets --all-topics --to-offset 10000 --execute --group kafka2kudu_ylyh_cn

/usr/bin/kafka-consumer-groups

  1. #!/bin/bash
  2. # Reference: http://stackoverflow.com/questions/59895/can-a-bash-script-tell-what-directory-its-stored-in
  3. SOURCE="${BASH_SOURCE[0]}"
  4. BIN_DIR="$( dirname "$SOURCE" )"
  5. while [ -h "$SOURCE" ]
  6. do
  7. SOURCE="$(readlink "$SOURCE")"
  8. [[ $SOURCE != /* ]] && SOURCE="$DIR/$SOURCE"
  9. BIN_DIR="$( cd -P "$( dirname "$SOURCE" )" && pwd )"
  10. done
  11. BIN_DIR="$( cd -P "$( dirname "$SOURCE" )" && pwd )"
  12. LIB_DIR=$BIN_DIR/../lib
  13. # Autodetect JAVA_HOME if not defined
  14. if [ -e $LIB_DIR/../../CDH/lib/bigtop-utils/bigtop-detect-javahome ] ; then
  15. . $LIB_DIR/../../CDH/lib/bigtop-utils/bigtop-detect-javahome
  16. fi
  17. exec $LIB_DIR/kafka/bin/kafka-consumer-groups.sh "$@"