准备三台服务器
192.168.188.138
192.168.188.139
192.168.188.140
由于kafka运行需要安装zookeeper,而zookeeper需要安装jdk,需首先安装jdk,其次安装zookeeper,最后安装kafka
1、首先三台服务器上传:jdk-8u191-linux-x64.tar.gz kafka_2.11-2.1.0.tgz,由于zookeeper安装包在kafka安装包内,故只需要上传kafka安装包即可
rz jdk-8u191-linux-x64.tar.gz kafka_2.11-2.1.0.tgz
tar xf jdk-8u191-linux-x64.tar.gz -C /usr/local
tar xf kafka_2.11-2.1.0.tgz -C /usr/local
cd /usr/local && mv jdk1.8.0_191 java && mv kafka_2.11-2.1.0 kafka
添加Java环境变量
vim /etc/profile
JAVA_HOME=/usr/local/java
PATH=$JAVA_HOME/bin:$PATH
export JAVA_HOME PATH
source /etc/profile #使环境变量生效
2、首先部署zookeeper
编写zookeeper配置文件:
cd /usr/local/kafka/ && sed -i ‘s/^[^#]/#&/‘ config/zookeeper.properties #将默认zookeeper配置文件注释
vim config/zookeeper.properties
dataDir=/data/zookeeper/data #zookeeper数据存放目录。
dataLogDir=/data/zookeeper/logs #zookeeper日志文件存放目录
clientPort=2181 #客户端连接zookeeper服务的端口
tickTime=2000 #zookeeper服务器之间或客户端与服务器之间维持心跳的时间间隔。
initLimit=20 #允许follower(相对于Leaderer言的“客户端”)连接并同步到Leader的初始化连接时间,以tickTime为单位。当初始化连接时间超过该值,则表示连接失败。
syncLimit=10 #Leader与Follower之间发送消息时,请求和应答时间长度。如果follower在设置时间内不能与leader通信,那么此follower将会被丢弃。
server.1=192.168.188.138:2888:3888 #kafka集群IP:Port
server.2=192.168.188.139:2888:3888
server.3=192.168.188.140:2888:3888
#注:2888是follower与leader交换信息的端口,3888是当leader挂了时用来执行选举时服务器相互通信的端口。
3、部署kafka:
cd /usr/local/kafka/ && sed -i ‘s/^[^#]/#&/‘ config/server.properties #将默认kafka配置文件注释
vim config/server.properties
broker.id=1 #每个server需要单独配置broker id,如果不配置系统会自动配置。
listeners=PLAINTEXT://192.168.188.138:9092 #监听地址,格式PLAINTEXT://IP:端口。
num.network.threads=3 #接收和发送网络信息的线程数。
num.io.threads=6 # 服务器用于处理请求的线程数,其中可能包括磁盘I/O。
socket.send.buffer.bytes=102400 #套接字服务器使用的发送缓冲区(SO_SNDBUF)
socket.receive.buffer.bytes=102400 #套接字服务器使用的接收缓冲区(SO_RCVBUF)
socket.request.max.bytes=104857600 #套接字服务器将接受的请求的最大大小(防止OOM)
log.dirs=/data/kafka/logs #日志文件目录。
num.partitions=6 #partition数量。
num.recovery.threads.per.data.dir=1 #在启动时恢复日志、关闭时刷盘日志每个数据目录的线程的数量,默认1。
offsets.topic.replication.factor=2 #偏移量话题的复制因子(设置更高保证可用),为了保证有效的复制,偏移话题的复制因子是可配置的,在偏移话题的第一次请求的时候可用的broker的数量至少为复制因子的大小,否则要么话题创建失败,要么复制因子取可用broker的数量和配置复制因子的最小值。
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168 #日志文件删除之前保留的时间(单位小时),默认168
log.segment.bytes=536870912 #单个日志文件的大小,默认1073741824
log.retention.check.interval.ms=300000 #检查日志段以查看是否可以根据保留策略删除它们的时间间隔。
zookeeper.connect=192.168.188.138:2181,192.168.188.139:2181,192.168.188.140:2181 #zookeeper主机地址,如果zookeeper是集群则以逗号隔开。
zookeeper.connection.timeout.ms=6000 #连接到Zookeeper的超时时间。
group.initial.rebalance.delay.ms=0
配置其他两台服务器kafka配置文件,除(broker.id,myid)不同,listeners其ip根据当前主机ip修改,其他全部一致
192.168.188.138:echo 1 > /data/zookeeper/data/myid
192.168.188.139:echo 2 > /data/zookeeper/data/myid
192.168.188.140:echo 3 > /data/zookeeper/data/myid
4、依次启动zookeeper、kafka
cd /usr/local/kafka
nohup bin/zookeeper-server-start.sh config/zookeeper.properties &
nohup bin/kafka-server-start.sh config/server.properties &
检测是否启动:
yum -y install nmap ncat && echo conf | nc 192.168.188.138 2181
clientPort=2181
dataDir=/data/zookeeper/data/version-2
dataLogDir=/data/zookeeper/logs/version-2
tickTime=2000
maxClientCnxns=60
minSessionTimeout=4000
maxSessionTimeout=40000
serverId=1
initLimit=20
syncLimit=10
electionAlg=3
electionPort=3888
quorumPort=2888
peerType=0
echo stat | nc 192.168.188.138 2181
Zookeeper version: 3.4.13-2d71af4dbe22557fda74f9a9b4309b15a7487f03, built on 06/29/2018 00:39 GMT
Clients:
/192.168.188.138:575500
Latency min/avg/max: 0/0/0
Received: 1
Sent: 0
Connections: 1
Outstanding: 0
Zxid: 0x900000000
Mode: leader
Node count: 136
Proposal sizes last/min/max: -1/-1/-1
创建topic:
bin/kafka-topics.sh —create —zookeeper 192.168.188.138:2181 —replication-factor —partitions 1 —topic topic1 #topic1为创建的话题名字
Created topic “topic1” #提示创建topic1成功
查询topic
bin/kafka-topics.sh —zookeeper 192.168.188.138:2181 —list
__consumer_offsets
topic1
模拟生产和消费记录 从138服务器发送消息到139服务器
生产producer:
bin/kafka-console-producer.sh —broker-list 192.168.188.138:9092 —topic topic1
>hello #终端输入hello
消费consumer:
bin/kafka-console-consumer.sh —broker-server 192.168.188.138:9092 —topic topic1 —from-beginning
hello #终端输出hello
查看主题信息:
bin/kafka-topic.sh —describe 192.168.188.138:2181 —topic topic1
Topic:topic1 PartitionCount:1 ReplicationFactor:1 Configs:
Topic: topic1 Partition: 0 Leader: 2 Replicas: 2 Isr: 2
