1、查询
# topic列表查询 --help
bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --list
bin/kafka-topics.sh --bootstrap-server localhost:9092 --list (支持0.9版本+)
# 查询topic信息(如分区,副本) --help
bin/kafka-topics.sh --describe --zookeeper 127.0.0.1:2181
# 消费者组列表查询(支持0.9版本+) --help
bin/kafka-consumer-groups.sh --bootstrap-server 192.100.3.79:9092 --list
#查看test-group-1组的详细信息 --help
bin/kafka-consumer-groups.sh --describe --bootstrap-server 192.100.3.79:9092 --group test-group-1
#查询一个topic的所有分区的偏移量 -time -1表示最大的位置,-2最小的位置
bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 192.100.3.79:9092 --time -1 --topic audit-demo
2、主题
#创建一个分区和一个副本名为“ test”的主题:
bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
bin/kafka-topics.sh --create --zookeeper node1:2181 --replication-factor 1 --partitions 1 --topic my-kafka-topic
#查看主题列表
bin/kafka-topics.sh --list --bootstrap-server ip:9092
#查看一个主题的信息
bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic test
#运行生产者,然后在控制台中键入一些消息以发送到服务器
kafka-console-producer.sh --broker-list localhost:9092 --topic test
#Kafka还有一个命令行使用者,它将消息转储到标准输出 消费主题
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
#删除主题
bin/kafka-topics.sh --bootstrap-server 192.100.3.25:9092 --delete --topic my_topic_name
#修改主题
bin/kafka-topics.sh --bootstrap-server 192.100.3.25:9092 --alter --topic my_topic_name --partitions 40
#查看主题
bin/kafka-topics.sh --list --zookeeper localhost:2181
#查看topic的详细信息,需要使用describe命令
kafka-topics.sh --describe --zookeeper ip:2181 --topic test-topic
#若不指定topic,则查看所有topic的信息
kafka-topics.sh --describe --zookeeper ip:2181
3、生产者
# 生产者
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
# 新生产者(支持0.9版本+)
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test --producer.config config/producer.properties
4、消费者
# 消费,指定组
bin/kafka-console-consumer.sh --bootstrap-server 192.100.3.73:9092 --topic audit-dev --consumer-property group.id=dd --from-beginning
bin/kafka-console-consumer.sh --bootstrap-server 192.100.3.73:9092 --topic audit-dev --group ss --from-beginning
# 不指定消费组,系统随机分配组,消费的消息数为5条
bin/kafka-console-consumer.sh --bootstrap-server 192.100.3.25:9092 --topic test --from-beginning --max-messages 5
# 新消费者(支持0.9版本+) ,指定消费者配置文件
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --new-consumer --from-beginning --consumer.config config/consumer.properties
--bootstrap-server 要连接的服务器
--consumer-property <String: 传递用户定义的机制consumer_prop>表单中的属性key=value to消费者。
--consumer.config <String: config file> consumer配置属性文件。注意onsumer-property优先于此配置。
--enable-systest-events 启用用户的系统测试事件日志生命周期事件除了记录消耗信息。(这是针对系统测试。)
--formatter <String: class> 要用于的类的名称格式化kafka邮件显示。(默认值:kafka.tools.DefaultMessageFormatter)
--from-beginning 如果消费者还没有已建立的消费补偿从最早的开始日志中出现的消息比最新的消息。
--group <String: consumer group id> 消费者的消费者组id。
--help Print usage information.
--isolation-level <String> 隔离级别<String>设置为read_committed以便筛选出事务性消息没有承诺。设置为read_uncommittedto全部读取信息。
(默认值:读未提交)(default: read_uncommitted)
--key-deserializer <String:
deserializer for key>
--max-messages <Integer: num_messages> 退出前消费消息的最大数量。如果没有设置,消费是持续的。
--offset <String: consume offset> 要从中使用的偏移id(非-负数),或“earliest”意思是从一开始,或者“latest”是指从最后(默认值:latest)
--partition <Integer: partition> The partition to consume from.
Consumption starts from the end of
the partition unless '--offset' is
specified.
--property <String: prop> The properties to initialize the
message formatter. Default
properties include:
print.timestamp=true|false
print.key=true|false
print.value=true|false
key.separator=<key.separator>
line.separator=<line.separator>
key.deserializer=<key.deserializer>
value.deserializer=<value.
deserializer>
Users can also pass in customized
properties for their formatter; more
specifically, users can pass in
properties keyed with 'key.
deserializer.' and 'value.
deserializer.' prefixes to configure
their deserializers.
--skip-message-on-error If there is an error when processing a
message, skip it instead of halt.
--timeout-ms <Integer: timeout_ms> If specified, exit if no message is
available for consumption for the
specified interval.
--topic <String: topic> The topic id to consume on.
--value-deserializer <String:
deserializer for values>
--whitelist <String: whitelist> Regular expression specifying
whitelist of topics to include for
consumption.
5、配置
#添加配置
kafka-configs.sh --bootstrap-server broker_host:port --entity-type topics --entity-name my_topic_name --alter --add-config x=y
#删除配置
kafka-configs.sh --bootstrap-server broker_host:port --entity-type topics --entity-name my_topic_name --alter --delete-config x
#为(user = user1,client-id = clientA)配置自定义配额:
kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' --entity-type users --entity-name user1 --entity-type clients --entity-name clientA
Updated config for entity: user-principal 'user1', client-id 'clientA'.
#为user = user1配置自定义配额:
kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' --entity-type users --entity-name user1
Updated config for entity: user-principal 'user1'.
#为user = userA配置默认的客户端ID配额:
kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' --entity-type users --entity-name user1 --entity-type clients --entity-default
Updated config for entity: user-principal 'user1', default client-id.
#配置用户的默认配额:
kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' --entity-type users --entity-default
Updated config for entity: default user-principal.
#配置客户端ID的默认配额:
kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' --entity-type clients --entity-default
Updated config for entity: default client-id.
#描述给定(用户,客户端ID)的配额的方法:
kafka-configs.sh --zookeeper localhost:2181 --describe --entity-type users --entity-name user1 --entity-type clients --entity-name clientA
Configs for user-principal 'user1', client-id 'clientA' are producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200
#描述给定用户的配额:
kafka-configs.sh --zookeeper localhost:2181 --describe --entity-type users --entity-name user1
Configs for user-principal 'user1' are producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200
#描述给定客户端ID的配额:
kafka-configs.sh --zookeeper localhost:2181 --describe --entity-type clients --entity-name clientA
Configs for client-id 'clientA' are producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200
#如果未指定实体名称,则将描述指定类型的所有实体。例如,描述所有用户:
bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-type users
Configs for user-principal 'user1' are producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200
Configs for default user-principal are producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200
#对于(用户,客户端)类似:
kafka-configs.sh --zookeeper localhost:2181 --describe --entity-type users --entity-type clients
Configs for user-principal 'user1', default client-id are producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200
Configs for user-principal 'user1', client-id 'clientA' are producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200
6、连接器
#使用Kafka Connect导入/导出数据
connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties