1、查询
# topic列表查询 --helpbin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --listbin/kafka-topics.sh --bootstrap-server localhost:9092 --list (支持0.9版本+)# 查询topic信息(如分区,副本) --helpbin/kafka-topics.sh --describe --zookeeper 127.0.0.1:2181# 消费者组列表查询(支持0.9版本+) --helpbin/kafka-consumer-groups.sh --bootstrap-server 192.100.3.79:9092 --list#查看test-group-1组的详细信息 --helpbin/kafka-consumer-groups.sh --describe --bootstrap-server 192.100.3.79:9092 --group test-group-1#查询一个topic的所有分区的偏移量 -time -1表示最大的位置,-2最小的位置 bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 192.100.3.79:9092 --time -1 --topic audit-demo
2、主题
#创建一个分区和一个副本名为“ test”的主题:bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic testbin/kafka-topics.sh --create --zookeeper node1:2181 --replication-factor 1 --partitions 1 --topic my-kafka-topic#查看主题列表bin/kafka-topics.sh --list --bootstrap-server ip:9092#查看一个主题的信息bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic test#运行生产者,然后在控制台中键入一些消息以发送到服务器kafka-console-producer.sh --broker-list localhost:9092 --topic test#Kafka还有一个命令行使用者,它将消息转储到标准输出 消费主题kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning#删除主题bin/kafka-topics.sh --bootstrap-server 192.100.3.25:9092 --delete --topic my_topic_name#修改主题bin/kafka-topics.sh --bootstrap-server 192.100.3.25:9092 --alter --topic my_topic_name --partitions 40#查看主题bin/kafka-topics.sh --list --zookeeper localhost:2181#查看topic的详细信息,需要使用describe命令kafka-topics.sh --describe --zookeeper ip:2181 --topic test-topic#若不指定topic,则查看所有topic的信息kafka-topics.sh --describe --zookeeper ip:2181
3、生产者
# 生产者 bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test# 新生产者(支持0.9版本+) bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test --producer.config config/producer.properties
4、消费者
# 消费,指定组 bin/kafka-console-consumer.sh --bootstrap-server 192.100.3.73:9092 --topic audit-dev --consumer-property group.id=dd --from-beginning bin/kafka-console-consumer.sh --bootstrap-server 192.100.3.73:9092 --topic audit-dev --group ss --from-beginning # 不指定消费组,系统随机分配组,消费的消息数为5条 bin/kafka-console-consumer.sh --bootstrap-server 192.100.3.25:9092 --topic test --from-beginning --max-messages 5 # 新消费者(支持0.9版本+) ,指定消费者配置文件bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --new-consumer --from-beginning --consumer.config config/consumer.properties
--bootstrap-server 要连接的服务器--consumer-property <String: 传递用户定义的机制consumer_prop>表单中的属性key=value to消费者。--consumer.config <String: config file> consumer配置属性文件。注意onsumer-property优先于此配置。--enable-systest-events 启用用户的系统测试事件日志生命周期事件除了记录消耗信息。(这是针对系统测试。) --formatter <String: class> 要用于的类的名称格式化kafka邮件显示。(默认值:kafka.tools.DefaultMessageFormatter)--from-beginning 如果消费者还没有已建立的消费补偿从最早的开始日志中出现的消息比最新的消息。--group <String: consumer group id> 消费者的消费者组id。--help Print usage information.--isolation-level <String> 隔离级别<String>设置为read_committed以便筛选出事务性消息没有承诺。设置为read_uncommittedto全部读取信息。 (默认值:读未提交)(default: read_uncommitted)--key-deserializer <String: deserializer for key>--max-messages <Integer: num_messages> 退出前消费消息的最大数量。如果没有设置,消费是持续的。--offset <String: consume offset> 要从中使用的偏移id(非-负数),或“earliest”意思是从一开始,或者“latest”是指从最后(默认值:latest)--partition <Integer: partition> The partition to consume from. Consumption starts from the end of the partition unless '--offset' is specified.--property <String: prop> The properties to initialize the message formatter. Default properties include: print.timestamp=true|false print.key=true|false print.value=true|false key.separator=<key.separator> line.separator=<line.separator> key.deserializer=<key.deserializer> value.deserializer=<value. deserializer> Users can also pass in customized properties for their formatter; more specifically, users can pass in properties keyed with 'key. deserializer.' and 'value. deserializer.' prefixes to configure their deserializers.--skip-message-on-error If there is an error when processing a message, skip it instead of halt.--timeout-ms <Integer: timeout_ms> If specified, exit if no message is available for consumption for the specified interval.--topic <String: topic> The topic id to consume on.--value-deserializer <String: deserializer for values>--whitelist <String: whitelist> Regular expression specifying whitelist of topics to include for consumption.
5、配置
#添加配置kafka-configs.sh --bootstrap-server broker_host:port --entity-type topics --entity-name my_topic_name --alter --add-config x=y#删除配置kafka-configs.sh --bootstrap-server broker_host:port --entity-type topics --entity-name my_topic_name --alter --delete-config x#为(user = user1,client-id = clientA)配置自定义配额:kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' --entity-type users --entity-name user1 --entity-type clients --entity-name clientAUpdated config for entity: user-principal 'user1', client-id 'clientA'.#为user = user1配置自定义配额:kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' --entity-type users --entity-name user1Updated config for entity: user-principal 'user1'.#为user = userA配置默认的客户端ID配额:kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' --entity-type users --entity-name user1 --entity-type clients --entity-defaultUpdated config for entity: user-principal 'user1', default client-id.#配置用户的默认配额:kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' --entity-type users --entity-defaultUpdated config for entity: default user-principal.#配置客户端ID的默认配额:kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' --entity-type clients --entity-defaultUpdated config for entity: default client-id.#描述给定(用户,客户端ID)的配额的方法:kafka-configs.sh --zookeeper localhost:2181 --describe --entity-type users --entity-name user1 --entity-type clients --entity-name clientAConfigs for user-principal 'user1', client-id 'clientA' are producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200#描述给定用户的配额:kafka-configs.sh --zookeeper localhost:2181 --describe --entity-type users --entity-name user1Configs for user-principal 'user1' are producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200#描述给定客户端ID的配额:kafka-configs.sh --zookeeper localhost:2181 --describe --entity-type clients --entity-name clientAConfigs for client-id 'clientA' are producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200#如果未指定实体名称,则将描述指定类型的所有实体。例如,描述所有用户:bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-type usersConfigs for user-principal 'user1' are producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200Configs for default user-principal are producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200#对于(用户,客户端)类似:kafka-configs.sh --zookeeper localhost:2181 --describe --entity-type users --entity-type clientsConfigs for user-principal 'user1', default client-id are producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200Configs for user-principal 'user1', client-id 'clientA' are producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200
6、连接器
#使用Kafka Connect导入/导出数据connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties