kafka普通集群搭建

1.准备工作

  1. 主机信息| 主机名 | IP地址 | 硬件配置 | | —- | —- | —- | | kafka-01 | 172.17.20.50 | 2C1G8G | | kafka-02 | 172.17.20.51 | 2C1G8G | | kafka-03 | 172.17.20.52 | 2C1G8G |
  1. 关闭防火墙和selinux```

    systemctl stop firewalld

    systemctl disable firewalld

setenforce 0

vim /etc/selinux/config

SELINUX=enforcing改为SELINUX=disabled

  1. 3. 安装JAVA环境

yum -y install java-1.8.0-openjdk* -y

java -version



<a name="8fc81961"></a>
### 2.搭建zookeeper集群

操作目录:/usr/local(可自己指定)

wget http://archive.apache.org/dist/zookeeper/zookeeper-3.4.10/zookeeper-3.4.10.tar.gz

tar xf zookeeper-3.4.10.tar.gz -C /usr/local/

mv zookeeper-3.4.12 zookeeper

vim /etc/profile

export ZK_HOME=/usr/local/zookeeper export PATH=$ZK_HOME/bin:$PATH

source /etc/profile

cp /usr/local/zookeeper/conf/zoo_sample.cfg /usr/local/zookeeper/conf/zoo.cfg


修改配置文件:`# vim /usr/local/zookeeper/conf/zoo.cfg`

修改数据文件夹路径

dataDir=/usr/local/zookeeper/data

添加集群信息

server.1=172.17.20.50:2888:3888 server.2=172.17.20.51:2888:3888 server.3=172.17.20.52:2888:3888


创建数据文件夹:`mkdir /usr/local/zookeeper/data`

创建`myid`文件,添加本机的Server ID,例如 172.17.20.50 --> server.1,`echo "1" > myid`

启动每台的zookeeper服务:`/usr/local/zookeeper/bin/zkServer.sh start`

查看zookeeper状态:

/usr/local/zookeeper/bin/zkServer.sh status

Mode:leader or follower


<a name="bab4e64a"></a>
### 3.搭建kafka集群

wget http://mirror.bit.edu.cn/apache/kafka/2.4.1/kafka_2.12-2.4.1.tgz

tar xf kafka_2.12-2.4.1.tgz -C /usr/local/

mv kafka_2.11-2.0.0 kafka

vim /etc/profile

set kafka environment

export KAFKA_HOME=/usr/local/kafka PATH=${KAFKA_HOME}/bin:$PATH

source /etc/profile


修改配置文件`.../kafka/config/server.properties`:

broker.id=1 // 每台机器的id必须不同 listeners=PLAINTEXT://172.17.20.50:9092 log.dirs=/data/kafka // kafka的数据路径最好和zk不再同一磁盘 zookeeper.connect=172.17.20.50:2181,172.17.20.51:2181,172.17.20.52:2181


启动kafka(确保zk正常运行):`/usr/local/kafka/bin/kafka-server-start.sh -daemon config/server.properties`

创建topic:`/usr/local/kafka/bin/kafka-topics.sh --create --zookeeper 172.17.20.50:2181 --replication-factor 1 --partitions 1 --topic test-topic`

查看topic信息:`/usr/local/kafka/bin/kafka-topics.sh --zookeeper 172.17.20.50:2181 --list`

<a name="a8823da7"></a>
## 基于SCRAM授权的kafka集群

<a name="d795f183"></a>
### 1. 说明

基于SCRAM授权的kafka集群可以实现如下功能:

- 让客户端必须填写账号和密码才能访问,无密码无法访问
- 读写权限分离,某些账号只能做消费者,某些账号只能做生产者, 当然也可以一个账号具有消费者和生产者的功能
- 主题(topic)分离,一个账号只能访问授权过的topic,非授权的topic无法访问
- groupid分离,没有授权的groupid 账号无法使用

<a name="5ed2e9af"></a>
### 2.安装部署

1. 正常安装zk集群,参见上文
2. 创建`kafka_server_jaas.conf`:

KafkaServer { org.apache.kafka.common.security.scram.ScramLoginModule required username=”admin” password=”admin”; };


3. 创建同步用户:`bin/kafka-configs.sh --zookeeper 172.17.20.50:2181 --alter --add-config 'SCRAM-SHA-256=[password=admin],SCRAM-SHA-512=[password=admin]' --entity-type users --entity-name admin`<br />**注意:2、3两步用户名密码要保持一致**
4. 修改kafka配置文件server.properties:

broker.id=1 listeners=SASL_PLAINTEXT://172.17.20.50:9092 advertised.listeners=SASL_PLAINTEXT://172.17.20.50:9092 num.network.threads=3 num.io.threads=8

queued.max.requests =500

socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 log.dirs=/data/kafka log.retention.hours=72 message.max.byte=5242880

default.replication.factor=2

replica.fetch.max.bytes=5242880 num.partitions=2 num.recovery.threads.per.data.dir=1 log.cleanup.policy = delete log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 log.cleaner.enable=false zookeeper.connect=172.17.20.50:2181,172.17.20.51:2181,172.17.20.52:2181 zookeeper.connection.timeout.ms=6000 security.inter.broker.protocol=SASL_PLAINTEXT sasl.mechanism.inter.broker.protocol=SCRAM-SHA-512 sasl.enabled.mechanisms=SCRAM-SHA-512 authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer super.users=User:admin

<br />注意:
   - 集群安装时,每个机器上server.properties文件配置不同,注意修改,broker.id,listeners,advertised.listeners,log.dirs,zookeeper.connect等参数
   - super.users需要配置kafka_server_jaas.conf文件中的账户
   - SCRAM-SHA-512 加密算法修改为SCRAM-SHA-256时下面的配置不适用,建议使用SCRAM-SHA-512
   - 这个配置最好在3个或以上节点的服务器上使用,否则会报错没有足够的broker来备份,出现这个问题可以调整默认分区数量和默认备份因子数量
5. 启动kafka服务<br />前台启动(测试用):

KAFKA_OPTS=-Djava.security.auth.login.config=/usr/local/kafka/conf/kafka_server_jaas.conf bin/kafka-server-start.sh /usr/local/kafka/kafka/config/server.properties > /dev/null &

<br />后台启动(正式用):

vim bin/kafka-run-class.sh

将KAFKA_OPTS=”” 修改 为 KAFKA_OPTS=”-Djava.security.auth.login.config=/usr/local/kafka/conf/kafka_server_jaas.conf”

nohup bin/kafka-server-start.sh /usr/local/kafka/config/server.properties > /dev/null &



<a name="c5a1484e"></a>
### 3.账户管理

1. 创建普通用户
   - 写用户:

bin/kafka-configs.sh —zookeeper 172.17.20.50:2181 —alter —add-config ‘SCRAM-SHA-256=[iterations=8192,password=wr-test],SCRAM-SHA-512=[password=wr-test]’ —entity-type users —entity-name writer


   - 读用户:

bin/kafka-configs.sh —zookeeper 172.17.20.50:2181 —alter —add-config ‘SCRAM-SHA-256=[iterations=8192,password=rd-test],SCRAM-SHA-512=[password=rd-test]’ —entity-type users —entity-name read`


2. 创建test topic:

bin/kafka-topics.sh —create —zookeeper 172.17.20.50:2181 —topic test —partitions 1 —replication-factor 1


3. 允许账号写入test topic(生产者账号必须执行)

bin/kafka-acls.sh —authorizer kafka.security.auth.SimpleAclAuthorizer —authorizer-properties zookeeper.connect=172.17.20.50:2181 —add —allow-principal User:writer —operation Write —topic test


4. 允许账号读取test topic(消费者账号必须执行)

bin/kafka-acls.sh —authorizer kafka.security.auth.SimpleAclAuthorizer —authorizer-properties zookeeper.connect=172.17.20.50:2181 —add —allow-principal User:reader —operation Read —topic test


5. 允许账号使用test-group的消费者组 (消费者账号必须执行)

bin/kafka-acls.sh —authorizer kafka.security.auth.SimpleAclAuthorizer —authorizer-properties zookeeper.connect=172.17.20.50:2181 —add —allow-principal User:reader —operation Read —group test-group


6. 消费者配置
   - 创建conf/consumer.conf:

security.protocol=SASL_PLAINTEXT sasl.mechanism=SCRAM-SHA-512 sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username=”reader” password=”rd-test”;

<br />**注意:这里的账号密码需要和上面账号管理创建的账号匹配**
   - 启动消费者

bin/kafka-console-consumer.sh —bootstrap-server 172.17.20.50:9092 —topic test —from-beginning —consumer.config /usr/local/kafka/conf/consumer.conf —group test-group

<br />**注意:使用的账号必须有读取topic和使用group的权限**
7. 生产者配置
   - 创建conf/product.conf:

security.protocol=SASL_PLAINTEXT sasl.mechanism=SCRAM-SHA-512 sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username=”writer” password=”wr-test”;


   - 启动生产者:

bin/kafka-console-producer.sh —broker-list 172.17.20.50:9092 —topic test —producer.config /usr/local/kafka/conf/producer.conf



<a name="7a038119"></a>
## Python使用教程

安装kafka-python: `pip install kafka-python==2.0.2`

Producer.py:

```python
# -*- coding: utf-8 -*-
import time
from kafka import KafkaProducer, KafkaConsumer

producer = KafkaProducer(
    sasl_mechanism="SCRAM-SHA-512",
    security_protocol='SASL_PLAINTEXT',
    sasl_plain_username="writer",
    sasl_plain_password="wr-test",
    bootstrap_servers=['172.17.20.50:9092']
)


def send_msg():
    for i in range(10):
        msg = "msg %d" % i
        # print(msg)
        future = producer.send('test', msg.encode(), key="test".encode())
        print(future)
        result = future.get(timeout=10)
        #time.sleep(5)
        print(result)
    producer.close()
    return "put OK"


if __name__ == '__main__':
    res = send_msg()

Consumer.py:

# -*- coding: utf-8 -*-
import sys
from kafka import KafkaProducer, KafkaConsumer

consumer = KafkaConsumer(
    'test',
    bootstrap_servers=['172.17.20.51:9092'],
    sasl_mechanism="SCRAM-SHA-512",
    security_protocol='SASL_PLAINTEXT',
    sasl_plain_username="xxx",
    sasl_plain_password="xxx",
    group_id='test-group'
)


def get_msg():
    for msg in consumer:
        recv = "%s:%d:%d: key=%s value=%s" % (msg.topic, msg.partition, msg.offset, msg.key, msg.value)
        print(recv)
    return "get_end"


if __name__ == '__main__':
    res = get_msg()
    print(res)