官网文档: http://kafka.apache.org/0110/documentation.html#monitoring
消息引擎系统:
生产者、消费者
批处理、流处理(kafka Streams)
创建一个topic
kafka-topics \--create \--bootstrap-server kafka1:9092,kafka2:9092,kafka3:9092 \--replication-factor 3 \--partitions 3 \--topic test_topic
查看topic 列表
kafka-topics --bootstrap-server kafka1:9092,kafka2:9092,kafka3:9092 --list
查看 topic 信息
kafka-topics --bootstrap-server kafka1:9092,kafka2:9092,kafka3:9092 --topic ylyh_vn --describe
使用生产者
kafka-console-producer \--broker-list kafka1:9092,kafka2:9092,kafka3:9092 \--topic test_topic
使用消费者 (另开终端)
kafka-console-consumer \--bootstrap-server kafka1:9092,kafka2:9092,kafka3:9092 \--from-beginning \--topic test_topic
列出所有group
kafka-consumer-groups --bootstrap-server kafka1:9092,kafka2:9092,kafka3:9092 --list
查看组信息
kafka-consumer-groups --bootstrap-server kafka1:9092 --describe --group kafka2kudu_ylyh_cn# 其中依次展示group名称、消费的topic名称、partition id、consumer group最后一次提交的offset、最后提交的生产消息offset、消费offset与生产offset之间的差值、当前消费topic-partition的group成员id(不一定包含hostname)
增加kafka分区
kafka-topics --bootstrap-server kafka1:9092,kafka2:9092,kafka3:9092 --topic ylyh_cn --alter --partitions 10
查看 topics 参数
kafka-configs --zookeeper zk1:2181,zk2:2181,zk3:2181 --entity-type topics --entity-name ylyh_cn --describe
修改 topics 参数
# 修改数据保留策略kafka-configs --entity-type topics --entity-name ylyh_cn --alter --add-config retention.ms=518400000 --zookeeper zk1:2181,zk2:2181,zk3:2181
常用命令小结
# 启动bin/kafka-server-start.sh config/server.properties &# 停止bin/kafka-server-stop.sh config/server.properties# 创建主题bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test_topic# 查看主题bin/kafka-topics.sh --bootstrap-server localhost:9092 --list# 查看主题信息bin/kafka-topics.sh --bootstrap-server localhost:9092 --topic test_topic --describe# 生产者bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test_topic# 消费者bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test_topic
# 列出消费组bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list# 消费者指定消费组消费bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test_topic --consumer-property group.id=testgroup# 删除消费组kafka-consumer-groups.sh --bootstrap-server {kafka连接地址} --delete --group {消费组}
# 查看 topics 参数kafka-configs --zookeeper zk1:2181,zk2:2181,zk3:2181 --entity-type topics --entity-name ylyh_cn --describe修改 topics 参数kafka-configs --entity-type topics --entity-name ylyh_cn --alter --add-config retention.ms=518400000 --zookeeper zk1:2181,zk2:2181,zk3:2181
kafka 分区 offset 调整
--to-earliest:把位移调整到分区当前最小位移--to-latest:把位移调整到分区当前最新位移--to-current:把位移调整到分区当前位移--to-offset <offset>: 把位移调整到指定位移处--shift-by N: 把位移调整到当前位移 + N处,注意N可以是负数,表示向前移动--to-datetime <datetime>:把位移调整到大于给定时间的最早位移处,datetime格式是yyyy-MM-ddTHH:mm:ss.xxx,比如2017-08-04T00:00:00.000--by-duration <duration>:把位移调整到距离当前时间指定间隔的位移处,duration格式是PnDTnHnMnS,比如PT0H5M0S--from-file <file>:从CSV文件中读取调整策略# --to-earliest 所有分区的位移重置为0,重新消费kafka-consumer-groups --bootstrap-server kafka1:9092,kafka2:9092,kafka3:9092 --reset-offsets --all-topics --to-earliest --execute --group kafka2kudu_ylyh_cn# --to-datetime 重置到某个时间点kafka-consumer-groups --bootstrap-server kafka1:9092,kafka2:9092,kafka3:9092 --reset-offsets --all-topics --to-datetime 2020-07-04T10:00:00.000 --execute --group kafka2kudu_ylyh_# --to-latest 所有分区的位移调整到最新,即LAG为0kafka-consumer-groups --bootstrap-server kafka1:9092,kafka2:9092,kafka3:9092 --reset-offsets --all-topics --to-latest --execute --group kafka2kudu_ylyh_cnkafka-consumer-groups --bootstrap-server kafka1:9092,kafka2:9092,kafka3:9092 --reset-offsets --all-topics --to-offset 10000 --execute --group kafka2kudu_ylyh_cn
/usr/bin/kafka-consumer-groups
#!/bin/bash# Reference: http://stackoverflow.com/questions/59895/can-a-bash-script-tell-what-directory-its-stored-inSOURCE="${BASH_SOURCE[0]}"BIN_DIR="$( dirname "$SOURCE" )"while [ -h "$SOURCE" ]doSOURCE="$(readlink "$SOURCE")"[[ $SOURCE != /* ]] && SOURCE="$DIR/$SOURCE"BIN_DIR="$( cd -P "$( dirname "$SOURCE" )" && pwd )"doneBIN_DIR="$( cd -P "$( dirname "$SOURCE" )" && pwd )"LIB_DIR=$BIN_DIR/../lib# Autodetect JAVA_HOME if not definedif [ -e $LIB_DIR/../../CDH/lib/bigtop-utils/bigtop-detect-javahome ] ; then. $LIB_DIR/../../CDH/lib/bigtop-utils/bigtop-detect-javahomefiexec $LIB_DIR/kafka/bin/kafka-consumer-groups.sh "$@"
