Kafka安装 -单节点-单broker
Kafka架构
producer
生产者consumer
消费者broker
存储数据topic
主题,标签。 这个数据是给谁消费的。
举个例子:母亲做馒头,孩子吃馒头,那么做馒头的母亲就是producer,孩子吃馒头就是consumer,做好的馒头放到篮子里等着孩子吃那么篮子就是broker,母亲有两个孩子,大孩子吃大馒头小孩子吃小馒头,那么给大馒头和小馒头各打一个标签,大孩子看到大馒头的标签就吃大馒头,小孩子看到小馒头的标签就吃小馒头
下载Kafka
解压
设置环境变量
$KAFKA_HOME/conf/server.properties
broker.id=0
listeners
host.name
log.dirs 这个路径不要写/tmp下不然一重启就没了可以自定义一个路径比如:/app/tmp/kafka-logs
zookeeper.connect:zk的地址 ip:port
启动kafka(先启动zookeeper)
kafka-server-start.sh
USAGE : /app/kafka_2.11-0.9.0.0/bin/kafka-server-start.sh [-daemon] server.properties [—override property=value]*
启动命令:
kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties
# 后台启动
-daemon
jps一下看看是否多了一个进程
创建topic
kafka-topics.sh —create —zookeeper localhost:2181 —replication-factor 1 —partitions 1 —topic hello_topic
#创建一个topic
kafka-topics.sh --create
#指向zookeeper的地址
--zookeeper localhost:2181
# 副本系数是1
--replication-factor 1
# 一个分区
--partitions 1
#topic的名称是hello_topic
--topic hello_topic
查看所有topic
kafka-topics.sh —list —zookeeper localhost:2181
发送消息
kafka-console-producer.sh —broker-list localhost:9092 —topic hello_topic
#**** 注意这里:这个9092是$KAFKA_HOME/conf/server.properties这里配置的listeners*****
--broker-list localhost:9092
消费消息:broker
kafka-console-consumer.sh —zookeeper localhost:2181 —topic hello_topic —from-beginning
# 此处的端口2181 别和发送消息搞混了, 这个2181是zookeeper的端口
--zookeeper localhost:2181
0.9之后
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic location --from-beginning
生产消息然后查看是否消费
# ******生产*****
[root@10-9-10-25 config]# kafka-console-producer.sh --broker-list localhost:9092 --topic hello_topic
111
11111
111111
hello
#******消费*******
[root@10-9-10-25 /]# kafka-console-consumer.sh --zookeeper localhost:2181 --topic hello_topic --from-beginning
111
11111
111111
hello
此时,如果把消费关了ctrl+c ,再次运行消费消息的命令,会看到之前的数据又出现了
[root@10-9-10-25 /]# kafka-console-consumer.sh --zookeeper localhost:2181 --topic hello_topic --from-beginning
111
11111
111111
hello
^CProcessed a total of 5 messages
[root@10-9-10-25 /]# kafka-console-consumer.sh --zookeeper localhost:2181 --topic hello_topic --from-beginning
111
11111
111111
hello
=========================
单节点多broker
假如都在一台机器上
在conf目录下复制3份 server.properties
[root@10-9-10-25 config]# cp server.properties server1.properties
[root@10-9-10-25 config]# cp server.properties server2.properties
[root@10-9-10-25 config]# cp server.properties server3.properties
分别编辑server123
broker.id=0 broker.id=1
listeners=PLAINTEXT://:9092 listeners=PLAINTEXT://:9093
log.dirs=/app/tmp/kafka-logs log.dirs=/app/tmp/kafka-logs-1
broker.id=0 broker.id=2
listeners=PLAINTEXT://:9092 listeners=PLAINTEXT://:9094
log.dirs=/app/tmp/kafka-logs log.dirs=/app/tmp/kafka-logs-2
broker.id=0 broker.id=3
listeners=PLAINTEXT://:9092 listeners=PLAINTEXT://:9095
log.dirs=/app/tmp/kafka-logs log.dirs=/app/tmp/kafka-logs-3
创建三个目录
[root@10-9-10-25 tmp]# mkdir /app/tmp/kafka-logs-1
[root@10-9-10-25 tmp]# mkdir /app/tmp/kafka-logs-2
[root@10-9-10-25 tmp]# mkdir /app/tmp/kafka-logs-3
kafka-server-start.sh -daemon $KAFKA_HOME/config/server1.properties
kafka-server-start.sh -daemon $KAFKA_HOME/config/server2.properties
kafka-server-start.sh -daemon $KAFKA_HOME/config/server3.properties
jps一下
创建topic
kafka-topics.sh —create —zookeeper localhost:2181 —replication-factor 3 —partitions 1 —topic hello2_topic
#3个副本
--replication-factor 3
#一个分区
--partitions 1
查看所有topic
kafka-topics.sh —list —zookeeper localhost:2181
[root@10-9-10-25 tmp]# kafka-topics.sh --list --zookeeper localhost:2181
hello2_topic
hello_topic
一个是单节点单broker创建的(hello_topic) 一个是刚才创建的(hello2_topic)
查看详情
kafka-topics.sh —describe —zookeeper localhost:2181
[root@10-9-10-25 tmp]# kafka-topics.sh --describe --zookeeper localhost:2181
Topic:hello2_topic PartitionCount:1 ReplicationFactor:3 Configs:
Topic: hello2_topic Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
Topic:hello_topic PartitionCount:1 ReplicationFactor:1 Configs:
Topic: hello_topic Partition: 0 Leader: 0 Replicas: 0 Isr: 0
看这个
Topic:hello2_topic PartitionCount:1 ReplicationFactor:3 Configs:
Topic: hello2_topic Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
PartitionCount:1——分区系数是1
ReplicationFactor:3 ——副本数是3
Leader: 1——一个分区3个副本必然有一个是主的,1这个是主的 (broker定义的id)
Replicas: 1,2,3——副本存放在哪
Isr: 1,2,3——当前活的是哪
java整合kafka踩坑
https://coding.imooc.com/learn/questiondetail/72164.html
builder.setBolt("test1",new TestBolt()).shuffleGrouping("testSpout","streamid1");
builder.setBolt("test2",new TestBolt2()).shuffleGrouping("testSpout","streamid2");
if (data==0){
this.spoutOutputCollector.emit("streamid1",new Values(data));
}else{
this.spoutOutputCollector.emit("streamid2",new Values(data));
}