准备三台服务器
    192.168.188.138
    192.168.188.139
    192.168.188.140
    由于kafka运行需要安装zookeeper,而zookeeper需要安装jdk,需首先安装jdk,其次安装zookeeper,最后安装kafka
    1、首先三台服务器上传:jdk-8u191-linux-x64.tar.gz kafka_2.11-2.1.0.tgz,由于zookeeper安装包在kafka安装包内,故只需要上传kafka安装包即可
    rz jdk-8u191-linux-x64.tar.gz kafka_2.11-2.1.0.tgz
    tar xf jdk-8u191-linux-x64.tar.gz -C /usr/local
    tar xf kafka_2.11-2.1.0.tgz -C /usr/local
    cd /usr/local && mv jdk1.8.0_191 java && mv kafka_2.11-2.1.0 kafka
    添加Java环境变量
    vim /etc/profile
    JAVA_HOME=/usr/local/java
    PATH=$JAVA_HOME/bin:$PATH
    export JAVA_HOME PATH
    source /etc/profile #使环境变量生效
    2、首先部署zookeeper
    编写zookeeper配置文件:
    cd /usr/local/kafka/ && sed -i ‘s/^[^#]/#&/‘ config/zookeeper.properties #将默认zookeeper配置文件注释
    vim config/zookeeper.properties
    dataDir=/data/zookeeper/data #zookeeper数据存放目录。
    dataLogDir=/data/zookeeper/logs #zookeeper日志文件存放目录
    clientPort=2181 #客户端连接zookeeper服务的端口
    tickTime=2000 #zookeeper服务器之间或客户端与服务器之间维持心跳的时间间隔。
    initLimit=20 #允许follower(相对于Leaderer言的“客户端”)连接并同步到Leader的初始化连接时间,以tickTime为单位。当初始化连接时间超过该值,则表示连接失败。
    syncLimit=10 #Leader与Follower之间发送消息时,请求和应答时间长度。如果follower在设置时间内不能与leader通信,那么此follower将会被丢弃。
    server.1=192.168.188.138:2888:3888 #kafka集群IP:Port
    server.2=192.168.188.139:2888:3888
    server.3=192.168.188.140:2888:3888
    #注:2888是follower与leader交换信息的端口,3888是当leader挂了时用来执行选举时服务器相互通信的端口。
    3、部署kafka:
    cd /usr/local/kafka/ && sed -i ‘s/^[^#]/#&/‘ config/server.properties #将默认kafka配置文件注释
    vim config/server.properties
    broker.id=1 #每个server需要单独配置broker id,如果不配置系统会自动配置。
    listeners=PLAINTEXT://192.168.188.138:9092 #监听地址,格式PLAINTEXT://IP:端口。
    num.network.threads=3 #接收和发送网络信息的线程数。
    num.io.threads=6 # 服务器用于处理请求的线程数,其中可能包括磁盘I/O。
    socket.send.buffer.bytes=102400 #套接字服务器使用的发送缓冲区(SO_SNDBUF)
    socket.receive.buffer.bytes=102400 #套接字服务器使用的接收缓冲区(SO_RCVBUF)
    socket.request.max.bytes=104857600 #套接字服务器将接受的请求的最大大小(防止OOM)
    log.dirs=/data/kafka/logs #日志文件目录。
    num.partitions=6 #partition数量。
    num.recovery.threads.per.data.dir=1 #在启动时恢复日志、关闭时刷盘日志每个数据目录的线程的数量,默认1。
    offsets.topic.replication.factor=2 #偏移量话题的复制因子(设置更高保证可用),为了保证有效的复制,偏移话题的复制因子是可配置的,在偏移话题的第一次请求的时候可用的broker的数量至少为复制因子的大小,否则要么话题创建失败,要么复制因子取可用broker的数量和配置复制因子的最小值。
    transaction.state.log.replication.factor=1
    transaction.state.log.min.isr=1
    log.retention.hours=168 #日志文件删除之前保留的时间(单位小时),默认168
    log.segment.bytes=536870912 #单个日志文件的大小,默认1073741824
    log.retention.check.interval.ms=300000 #检查日志段以查看是否可以根据保留策略删除它们的时间间隔。
    zookeeper.connect=192.168.188.138:2181,192.168.188.139:2181,192.168.188.140:2181 #zookeeper主机地址,如果zookeeper是集群则以逗号隔开。
    zookeeper.connection.timeout.ms=6000 #连接到Zookeeper的超时时间。
    group.initial.rebalance.delay.ms=0
    配置其他两台服务器kafka配置文件,除(broker.id,myid)不同,listeners其ip根据当前主机ip修改,其他全部一致
    192.168.188.138:echo 1 > /data/zookeeper/data/myid
    192.168.188.139:echo 2 > /data/zookeeper/data/myid
    192.168.188.140:echo 3 > /data/zookeeper/data/myid
    4、依次启动zookeeper、kafka
    cd /usr/local/kafka
    nohup bin/zookeeper-server-start.sh config/zookeeper.properties &
    nohup bin/kafka-server-start.sh config/server.properties &
    检测是否启动:
    yum -y install nmap ncat && echo conf | nc 192.168.188.138 2181
    clientPort=2181
    dataDir=/data/zookeeper/data/version-2
    dataLogDir=/data/zookeeper/logs/version-2
    tickTime=2000
    maxClientCnxns=60
    minSessionTimeout=4000
    maxSessionTimeout=40000
    serverId=1
    initLimit=20
    syncLimit=10
    electionAlg=3
    electionPort=3888
    quorumPort=2888
    peerType=0
    echo stat | nc 192.168.188.138 2181
    Zookeeper version: 3.4.13-2d71af4dbe22557fda74f9a9b4309b15a7487f03, built on 06/29/2018 00:39 GMT
    Clients:
    /192.168.188.138:575500

    Latency min/avg/max: 0/0/0
    Received: 1
    Sent: 0
    Connections: 1
    Outstanding: 0
    Zxid: 0x900000000
    Mode: leader
    Node count: 136
    Proposal sizes last/min/max: -1/-1/-1
    创建topic:
    bin/kafka-topics.sh —create —zookeeper 192.168.188.138:2181 —replication-factor —partitions 1 —topic topic1 #topic1为创建的话题名字
    Created topic “topic1” #提示创建topic1成功
    查询topic
    bin/kafka-topics.sh —zookeeper 192.168.188.138:2181 —list
    __consumer_offsets
    topic1
    模拟生产和消费记录 从138服务器发送消息到139服务器
    生产producer:
    bin/kafka-console-producer.sh —broker-list 192.168.188.138:9092 —topic topic1
    >hello #终端输入hello
    消费consumer:
    bin/kafka-console-consumer.sh —broker-server 192.168.188.138:9092 —topic topic1 —from-beginning
    hello #终端输出hello
    查看主题信息:
    bin/kafka-topic.sh —describe 192.168.188.138:2181 —topic topic1
    Topic:topic1 PartitionCount:1 ReplicationFactor:1 Configs:
    Topic: topic1 Partition: 0 Leader: 2 Replicas: 2 Isr: 2