Kafka安装 -单节点-单broker

Kafka架构

  • producer
    生产者

  • consumer
    消费者

  • broker
    存储数据

  • topic
    主题,标签。 这个数据是给谁消费的。

举个例子:母亲做馒头,孩子吃馒头,那么做馒头的母亲就是producer,孩子吃馒头就是consumer,做好的馒头放到篮子里等着孩子吃那么篮子就是broker,母亲有两个孩子,大孩子吃大馒头小孩子吃小馒头,那么给大馒头和小馒头各打一个标签,大孩子看到大馒头的标签就吃大馒头,小孩子看到小馒头的标签就吃小馒头

  1. 下载Kafka

  2. 解压

  3. 设置环境变量

$KAFKA_HOME/conf/server.properties

broker.id=0

listeners

host.name

log.dirs 这个路径不要写/tmp下不然一重启就没了可以自定义一个路径比如:/app/tmp/kafka-logs

zookeeper.connect:zk的地址 ip:port

启动kafka(先启动zookeeper)

kafka-server-start.sh

USAGE : /app/kafka_2.11-0.9.0.0/bin/kafka-server-start.sh [-daemon] server.properties [—override property=value]*

启动命令:

kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties

  1. # 后台启动
  2. -daemon

jps一下看看是否多了一个进程

创建topic

kafka-topics.sh —create —zookeeper localhost:2181 —replication-factor 1 —partitions 1 —topic hello_topic

  1. #创建一个topic
  2. kafka-topics.sh --create
  3. #指向zookeeper的地址
  4. --zookeeper localhost:2181
  5. # 副本系数是1
  6. --replication-factor 1
  7. # 一个分区
  8. --partitions 1
  9. #topic的名称是hello_topic
  10. --topic hello_topic

查看所有topic

kafka-topics.sh —list —zookeeper localhost:2181

发送消息

kafka-console-producer.sh —broker-list localhost:9092 —topic hello_topic

  1. #**** 注意这里:这个9092是$KAFKA_HOME/conf/server.properties这里配置的listeners*****
  2. --broker-list localhost:9092

消费消息:broker

kafka-console-consumer.sh —zookeeper localhost:2181 —topic hello_topic —from-beginning

  1. # 此处的端口2181 别和发送消息搞混了, 这个2181是zookeeper的端口
  2. --zookeeper localhost:2181

0.9之后

  1. kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic location --from-beginning

生产消息然后查看是否消费

  1. # ******生产*****
  2. [root@10-9-10-25 config]# kafka-console-producer.sh --broker-list localhost:9092 --topic hello_topic
  3. 111
  4. 11111
  5. 111111
  6. hello
  7. #******消费*******
  8. [root@10-9-10-25 /]# kafka-console-consumer.sh --zookeeper localhost:2181 --topic hello_topic --from-beginning
  9. 111
  10. 11111
  11. 111111
  12. hello

此时,如果把消费关了ctrl+c ,再次运行消费消息的命令会看到之前的数据又出现了

  1. [root@10-9-10-25 /]# kafka-console-consumer.sh --zookeeper localhost:2181 --topic hello_topic --from-beginning
  2. 111
  3. 11111
  4. 111111
  5. hello
  6. ^CProcessed a total of 5 messages
  7. [root@10-9-10-25 /]# kafka-console-consumer.sh --zookeeper localhost:2181 --topic hello_topic --from-beginning
  8. 111
  9. 11111
  10. 111111
  11. hello

=========================

单节点多broker

假如都在一台机器上

在conf目录下复制3份 server.properties

  1. [root@10-9-10-25 config]# cp server.properties server1.properties
  2. [root@10-9-10-25 config]# cp server.properties server2.properties
  3. [root@10-9-10-25 config]# cp server.properties server3.properties

分别编辑server123

broker.id=0 broker.id=1
listeners=PLAINTEXT://:9092 listeners=PLAINTEXT://:9093
log.dirs=/app/tmp/kafka-logs log.dirs=/app/tmp/kafka-logs-1

broker.id=0 broker.id=2
listeners=PLAINTEXT://:9092 listeners=PLAINTEXT://:9094
log.dirs=/app/tmp/kafka-logs log.dirs=/app/tmp/kafka-logs-2

broker.id=0 broker.id=3
listeners=PLAINTEXT://:9092 listeners=PLAINTEXT://:9095
log.dirs=/app/tmp/kafka-logs log.dirs=/app/tmp/kafka-logs-3

创建三个目录

[root@10-9-10-25 tmp]# mkdir /app/tmp/kafka-logs-1
[root@10-9-10-25 tmp]# mkdir /app/tmp/kafka-logs-2
[root@10-9-10-25 tmp]# mkdir /app/tmp/kafka-logs-3

  1. kafka-server-start.sh -daemon $KAFKA_HOME/config/server1.properties
  2. kafka-server-start.sh -daemon $KAFKA_HOME/config/server2.properties
  3. kafka-server-start.sh -daemon $KAFKA_HOME/config/server3.properties

jps一下

创建topic

kafka-topics.sh —create —zookeeper localhost:2181 —replication-factor 3 —partitions 1 —topic hello2_topic

  1. #3个副本
  2. --replication-factor 3
  3. #一个分区
  4. --partitions 1

查看所有topic

kafka-topics.sh —list —zookeeper localhost:2181

  1. [root@10-9-10-25 tmp]# kafka-topics.sh --list --zookeeper localhost:2181
  2. hello2_topic
  3. hello_topic

一个是单节点单broker创建的(hello_topic) 一个是刚才创建的(hello2_topic)

查看详情

kafka-topics.sh —describe —zookeeper localhost:2181

  1. [root@10-9-10-25 tmp]# kafka-topics.sh --describe --zookeeper localhost:2181
  2. Topic:hello2_topic PartitionCount:1 ReplicationFactor:3 Configs:
  3. Topic: hello2_topic Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
  4. Topic:hello_topic PartitionCount:1 ReplicationFactor:1 Configs:
  5. Topic: hello_topic Partition: 0 Leader: 0 Replicas: 0 Isr: 0

看这个

Topic:hello2_topic PartitionCount:1 ReplicationFactor:3 Configs:
Topic: hello2_topic Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3

PartitionCount:1——分区系数是1
ReplicationFactor:3 ——副本数是3
Leader: 1——一个分区3个副本必然有一个是主的,1这个是主的 (broker定义的id)
Replicas: 1,2,3——副本存放在哪
Isr: 1,2,3——当前活的是哪

java整合kafka踩坑

https://coding.imooc.com/learn/questiondetail/72164.html

  1. builder.setBolt("test1",new TestBolt()).shuffleGrouping("testSpout","streamid1");
  2. builder.setBolt("test2",new TestBolt2()).shuffleGrouping("testSpout","streamid2");
  1. if (data==0){
  2. this.spoutOutputCollector.emit("streamid1",new Values(data));
  3. }else{
  4. this.spoutOutputCollector.emit("streamid2",new Values(data));
  5. }