1、查询

  1. # topic列表查询 --help
  2. bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --list
  3. bin/kafka-topics.sh --bootstrap-server localhost:9092 --list (支持0.9版本+)
  4. # 查询topic信息(如分区,副本) --help
  5. bin/kafka-topics.sh --describe --zookeeper 127.0.0.1:2181
  6. # 消费者组列表查询(支持0.9版本+) --help
  7. bin/kafka-consumer-groups.sh --bootstrap-server 192.100.3.79:9092 --list
  8. #查看test-group-1组的详细信息 --help
  9. bin/kafka-consumer-groups.sh --describe --bootstrap-server 192.100.3.79:9092 --group test-group-1
  10. #查询一个topic的所有分区的偏移量 -time -1表示最大的位置,-2最小的位置
  11. bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 192.100.3.79:9092 --time -1 --topic audit-demo

2、主题

  1. #创建一个分区和一个副本名为“ test”的主题:
  2. bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
  3. bin/kafka-topics.sh --create --zookeeper node1:2181 --replication-factor 1 --partitions 1 --topic my-kafka-topic
  4. #查看主题列表
  5. bin/kafka-topics.sh --list --bootstrap-server ip:9092
  6. #查看一个主题的信息
  7. bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic test
  8. #运行生产者,然后在控制台中键入一些消息以发送到服务器
  9. kafka-console-producer.sh --broker-list localhost:9092 --topic test
  10. #Kafka还有一个命令行使用者,它将消息转储到标准输出 消费主题
  11. kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
  12. #删除主题
  13. bin/kafka-topics.sh --bootstrap-server 192.100.3.25:9092 --delete --topic my_topic_name
  14. #修改主题
  15. bin/kafka-topics.sh --bootstrap-server 192.100.3.25:9092 --alter --topic my_topic_name --partitions 40
  16. #查看主题
  17. bin/kafka-topics.sh --list --zookeeper localhost:2181
  18. #查看topic的详细信息,需要使用describe命令
  19. kafka-topics.sh --describe --zookeeper ip:2181 --topic test-topic
  20. #若不指定topic,则查看所有topic的信息
  21. kafka-topics.sh --describe --zookeeper ip:2181

3、生产者

  1. # 生产者
  2. bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
  3. # 新生产者(支持0.9版本+)
  4. bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test --producer.config config/producer.properties

4、消费者

  1. # 消费,指定组
  2. bin/kafka-console-consumer.sh --bootstrap-server 192.100.3.73:9092 --topic audit-dev --consumer-property group.id=dd --from-beginning
  3. bin/kafka-console-consumer.sh --bootstrap-server 192.100.3.73:9092 --topic audit-dev --group ss --from-beginning
  4. # 不指定消费组,系统随机分配组,消费的消息数为5条
  5. bin/kafka-console-consumer.sh --bootstrap-server 192.100.3.25:9092 --topic test --from-beginning --max-messages 5
  6. # 新消费者(支持0.9版本+) ,指定消费者配置文件
  7. bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --new-consumer --from-beginning --consumer.config config/consumer.properties
  1. --bootstrap-server 要连接的服务器
  2. --consumer-property <String: 传递用户定义的机制consumer_prop>表单中的属性key=value to消费者。
  3. --consumer.config <String: config file> consumer配置属性文件。注意onsumer-property优先于此配置。
  4. --enable-systest-events 启用用户的系统测试事件日志生命周期事件除了记录消耗信息。(这是针对系统测试。)
  5. --formatter <String: class> 要用于的类的名称格式化kafka邮件显示。(默认值:kafka.tools.DefaultMessageFormatter
  6. --from-beginning 如果消费者还没有已建立的消费补偿从最早的开始日志中出现的消息比最新的消息。
  7. --group <String: consumer group id> 消费者的消费者组id
  8. --help Print usage information.
  9. --isolation-level <String> 隔离级别<String>设置为read_committed以便筛选出事务性消息没有承诺。设置为read_uncommittedto全部读取信息。
  10. (默认值:读未提交)(default: read_uncommitted)
  11. --key-deserializer <String:
  12. deserializer for key>
  13. --max-messages <Integer: num_messages> 退出前消费消息的最大数量。如果没有设置,消费是持续的。
  14. --offset <String: consume offset> 要从中使用的偏移id(非-负数),或“earliest”意思是从一开始,或者“latest”是指从最后(默认值:latest
  15. --partition <Integer: partition> The partition to consume from.
  16. Consumption starts from the end of
  17. the partition unless '--offset' is
  18. specified.
  19. --property <String: prop> The properties to initialize the
  20. message formatter. Default
  21. properties include:
  22. print.timestamp=true|false
  23. print.key=true|false
  24. print.value=true|false
  25. key.separator=<key.separator>
  26. line.separator=<line.separator>
  27. key.deserializer=<key.deserializer>
  28. value.deserializer=<value.
  29. deserializer>
  30. Users can also pass in customized
  31. properties for their formatter; more
  32. specifically, users can pass in
  33. properties keyed with 'key.
  34. deserializer.' and 'value.
  35. deserializer.' prefixes to configure
  36. their deserializers.
  37. --skip-message-on-error If there is an error when processing a
  38. message, skip it instead of halt.
  39. --timeout-ms <Integer: timeout_ms> If specified, exit if no message is
  40. available for consumption for the
  41. specified interval.
  42. --topic <String: topic> The topic id to consume on.
  43. --value-deserializer <String:
  44. deserializer for values>
  45. --whitelist <String: whitelist> Regular expression specifying
  46. whitelist of topics to include for
  47. consumption.

5、配置

  1. #添加配置
  2. kafka-configs.sh --bootstrap-server broker_host:port --entity-type topics --entity-name my_topic_name --alter --add-config x=y
  3. #删除配置
  4. kafka-configs.sh --bootstrap-server broker_host:port --entity-type topics --entity-name my_topic_name --alter --delete-config x
  5. #为(user = user1,client-id = clientA)配置自定义配额:
  6. kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' --entity-type users --entity-name user1 --entity-type clients --entity-name clientA
  7. Updated config for entity: user-principal 'user1', client-id 'clientA'.
  8. #为user = user1配置自定义配额:
  9. kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' --entity-type users --entity-name user1
  10. Updated config for entity: user-principal 'user1'.
  11. #为user = userA配置默认的客户端ID配额:
  12. kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' --entity-type users --entity-name user1 --entity-type clients --entity-default
  13. Updated config for entity: user-principal 'user1', default client-id.
  14. #配置用户的默认配额:
  15. kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' --entity-type users --entity-default
  16. Updated config for entity: default user-principal.
  17. #配置客户端ID的默认配额:
  18. kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' --entity-type clients --entity-default
  19. Updated config for entity: default client-id.
  20. #描述给定(用户,客户端ID)的配额的方法:
  21. kafka-configs.sh --zookeeper localhost:2181 --describe --entity-type users --entity-name user1 --entity-type clients --entity-name clientA
  22. Configs for user-principal 'user1', client-id 'clientA' are producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200
  23. #描述给定用户的配额:
  24. kafka-configs.sh --zookeeper localhost:2181 --describe --entity-type users --entity-name user1
  25. Configs for user-principal 'user1' are producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200
  26. #描述给定客户端ID的配额:
  27. kafka-configs.sh --zookeeper localhost:2181 --describe --entity-type clients --entity-name clientA
  28. Configs for client-id 'clientA' are producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200
  29. #如果未指定实体名称,则将描述指定类型的所有实体。例如,描述所有用户:
  30. bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-type users
  31. Configs for user-principal 'user1' are producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200
  32. Configs for default user-principal are producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200
  33. #对于(用户,客户端)类似:
  34. kafka-configs.sh --zookeeper localhost:2181 --describe --entity-type users --entity-type clients
  35. Configs for user-principal 'user1', default client-id are producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200
  36. Configs for user-principal 'user1', client-id 'clientA' are producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200

6、连接器

  1. #使用Kafka Connect导入/导出数据
  2. connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties