- kafka普通集群搭建
- systemctl stop firewalld
- systemctl disable firewalld
- setenforce 0
- vim /etc/selinux/config
- yum -y install java-1.8.0-openjdk* -y
- java -version
- http://archive.apache.org/dist/zookeeper/zookeeper-3.4.10/zookeeper-3.4.10.tar.gz">wget http://archive.apache.org/dist/zookeeper/zookeeper-3.4.10/zookeeper-3.4.10.tar.gz
- tar xf zookeeper-3.4.10.tar.gz -C /usr/local/
- mv zookeeper-3.4.12 zookeeper
- vim /etc/profile
- source /etc/profile
- cp /usr/local/zookeeper/conf/zoo_sample.cfg /usr/local/zookeeper/conf/zoo.cfg
- 修改数据文件夹路径
- 添加集群信息
- http://mirror.bit.edu.cn/apache/kafka/2.4.1/kafka_2.12-2.4.1.tgz">wget http://mirror.bit.edu.cn/apache/kafka/2.4.1/kafka_2.12-2.4.1.tgz
- tar xf kafka_2.12-2.4.1.tgz -C /usr/local/
- mv kafka_2.11-2.0.0 kafka
- vim /etc/profile
- set kafka environment
- source /etc/profile
- queued.max.requests =500
- default.replication.factor=2
- vim bin/kafka-run-class.sh
- nohup bin/kafka-server-start.sh /usr/local/kafka/config/server.properties > /dev/null &
- bin/kafka-configs.sh —zookeeper 172.17.20.50:2181 —alter —add-config ‘SCRAM-SHA-256=[iterations=8192,password=wr-test],SCRAM-SHA-512=[password=wr-test]’ —entity-type users —entity-name writer
- bin/kafka-configs.sh —zookeeper 172.17.20.50:2181 —alter —add-config ‘SCRAM-SHA-256=[iterations=8192,password=rd-test],SCRAM-SHA-512=[password=rd-test]’ —entity-type users —entity-name read`
- bin/kafka-topics.sh —create —zookeeper 172.17.20.50:2181 —topic test —partitions 1 —replication-factor 1
- bin/kafka-acls.sh —authorizer kafka.security.auth.SimpleAclAuthorizer —authorizer-properties zookeeper.connect=172.17.20.50:2181 —add —allow-principal User:writer —operation Write —topic test
- bin/kafka-acls.sh —authorizer kafka.security.auth.SimpleAclAuthorizer —authorizer-properties zookeeper.connect=172.17.20.50:2181 —add —allow-principal User:reader —operation Read —topic test
- bin/kafka-acls.sh —authorizer kafka.security.auth.SimpleAclAuthorizer —authorizer-properties zookeeper.connect=172.17.20.50:2181 —add —allow-principal User:reader —operation Read —group test-group
- bin/kafka-console-consumer.sh —bootstrap-server 172.17.20.50:9092 —topic test —from-beginning —consumer.config /usr/local/kafka/conf/consumer.conf —group test-group
- bin/kafka-console-producer.sh —broker-list 172.17.20.50:9092 —topic test —producer.config /usr/local/kafka/conf/producer.conf
kafka普通集群搭建
1.准备工作
- 主机信息| 主机名 | IP地址 | 硬件配置 | | —- | —- | —- | | kafka-01 | 172.17.20.50 | 2C1G8G | | kafka-02 | 172.17.20.51 | 2C1G8G | | kafka-03 | 172.17.20.52 | 2C1G8G |
setenforce 0
vim /etc/selinux/config
SELINUX=enforcing改为SELINUX=disabled
3. 安装JAVA环境
yum -y install java-1.8.0-openjdk* -y
java -version
<a name="8fc81961"></a>
### 2.搭建zookeeper集群
操作目录:/usr/local(可自己指定)
wget http://archive.apache.org/dist/zookeeper/zookeeper-3.4.10/zookeeper-3.4.10.tar.gz
tar xf zookeeper-3.4.10.tar.gz -C /usr/local/
mv zookeeper-3.4.12 zookeeper
vim /etc/profile
export ZK_HOME=/usr/local/zookeeper export PATH=$ZK_HOME/bin:$PATH
source /etc/profile
cp /usr/local/zookeeper/conf/zoo_sample.cfg /usr/local/zookeeper/conf/zoo.cfg
修改配置文件:`# vim /usr/local/zookeeper/conf/zoo.cfg`
修改数据文件夹路径
dataDir=/usr/local/zookeeper/data
添加集群信息
server.1=172.17.20.50:2888:3888 server.2=172.17.20.51:2888:3888 server.3=172.17.20.52:2888:3888
创建数据文件夹:`mkdir /usr/local/zookeeper/data`
创建`myid`文件,添加本机的Server ID,例如 172.17.20.50 --> server.1,`echo "1" > myid`
启动每台的zookeeper服务:`/usr/local/zookeeper/bin/zkServer.sh start`
查看zookeeper状态:
/usr/local/zookeeper/bin/zkServer.sh status
Mode:leader or follower
<a name="bab4e64a"></a>
### 3.搭建kafka集群
wget http://mirror.bit.edu.cn/apache/kafka/2.4.1/kafka_2.12-2.4.1.tgz
tar xf kafka_2.12-2.4.1.tgz -C /usr/local/
mv kafka_2.11-2.0.0 kafka
vim /etc/profile
set kafka environment
export KAFKA_HOME=/usr/local/kafka PATH=${KAFKA_HOME}/bin:$PATH
source /etc/profile
修改配置文件`.../kafka/config/server.properties`:
broker.id=1 // 每台机器的id必须不同 listeners=PLAINTEXT://172.17.20.50:9092 log.dirs=/data/kafka // kafka的数据路径最好和zk不再同一磁盘 zookeeper.connect=172.17.20.50:2181,172.17.20.51:2181,172.17.20.52:2181
启动kafka(确保zk正常运行):`/usr/local/kafka/bin/kafka-server-start.sh -daemon config/server.properties`
创建topic:`/usr/local/kafka/bin/kafka-topics.sh --create --zookeeper 172.17.20.50:2181 --replication-factor 1 --partitions 1 --topic test-topic`
查看topic信息:`/usr/local/kafka/bin/kafka-topics.sh --zookeeper 172.17.20.50:2181 --list`
<a name="a8823da7"></a>
## 基于SCRAM授权的kafka集群
<a name="d795f183"></a>
### 1. 说明
基于SCRAM授权的kafka集群可以实现如下功能:
- 让客户端必须填写账号和密码才能访问,无密码无法访问
- 读写权限分离,某些账号只能做消费者,某些账号只能做生产者, 当然也可以一个账号具有消费者和生产者的功能
- 主题(topic)分离,一个账号只能访问授权过的topic,非授权的topic无法访问
- groupid分离,没有授权的groupid 账号无法使用
<a name="5ed2e9af"></a>
### 2.安装部署
1. 正常安装zk集群,参见上文
2. 创建`kafka_server_jaas.conf`:
KafkaServer { org.apache.kafka.common.security.scram.ScramLoginModule required username=”admin” password=”admin”; };
3. 创建同步用户:`bin/kafka-configs.sh --zookeeper 172.17.20.50:2181 --alter --add-config 'SCRAM-SHA-256=[password=admin],SCRAM-SHA-512=[password=admin]' --entity-type users --entity-name admin`<br />**注意:2、3两步用户名密码要保持一致**
4. 修改kafka配置文件server.properties:
broker.id=1 listeners=SASL_PLAINTEXT://172.17.20.50:9092 advertised.listeners=SASL_PLAINTEXT://172.17.20.50:9092 num.network.threads=3 num.io.threads=8
queued.max.requests =500
socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 log.dirs=/data/kafka log.retention.hours=72 message.max.byte=5242880
default.replication.factor=2
replica.fetch.max.bytes=5242880 num.partitions=2 num.recovery.threads.per.data.dir=1 log.cleanup.policy = delete log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 log.cleaner.enable=false zookeeper.connect=172.17.20.50:2181,172.17.20.51:2181,172.17.20.52:2181 zookeeper.connection.timeout.ms=6000 security.inter.broker.protocol=SASL_PLAINTEXT sasl.mechanism.inter.broker.protocol=SCRAM-SHA-512 sasl.enabled.mechanisms=SCRAM-SHA-512 authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer super.users=User:admin
<br />注意:
- 集群安装时,每个机器上server.properties文件配置不同,注意修改,broker.id,listeners,advertised.listeners,log.dirs,zookeeper.connect等参数
- super.users需要配置kafka_server_jaas.conf文件中的账户
- SCRAM-SHA-512 加密算法修改为SCRAM-SHA-256时下面的配置不适用,建议使用SCRAM-SHA-512
- 这个配置最好在3个或以上节点的服务器上使用,否则会报错没有足够的broker来备份,出现这个问题可以调整默认分区数量和默认备份因子数量
5. 启动kafka服务<br />前台启动(测试用):
KAFKA_OPTS=-Djava.security.auth.login.config=/usr/local/kafka/conf/kafka_server_jaas.conf bin/kafka-server-start.sh /usr/local/kafka/kafka/config/server.properties > /dev/null &
<br />后台启动(正式用):
vim bin/kafka-run-class.sh
将KAFKA_OPTS=”” 修改 为 KAFKA_OPTS=”-Djava.security.auth.login.config=/usr/local/kafka/conf/kafka_server_jaas.conf”
nohup bin/kafka-server-start.sh /usr/local/kafka/config/server.properties > /dev/null &
<a name="c5a1484e"></a>
### 3.账户管理
1. 创建普通用户
- 写用户:
bin/kafka-configs.sh —zookeeper 172.17.20.50:2181 —alter —add-config ‘SCRAM-SHA-256=[iterations=8192,password=wr-test],SCRAM-SHA-512=[password=wr-test]’ —entity-type users —entity-name writer
- 读用户:
bin/kafka-configs.sh —zookeeper 172.17.20.50:2181 —alter —add-config ‘SCRAM-SHA-256=[iterations=8192,password=rd-test],SCRAM-SHA-512=[password=rd-test]’ —entity-type users —entity-name read`
2. 创建test topic:
bin/kafka-topics.sh —create —zookeeper 172.17.20.50:2181 —topic test —partitions 1 —replication-factor 1
3. 允许账号写入test topic(生产者账号必须执行)
bin/kafka-acls.sh —authorizer kafka.security.auth.SimpleAclAuthorizer —authorizer-properties zookeeper.connect=172.17.20.50:2181 —add —allow-principal User:writer —operation Write —topic test
4. 允许账号读取test topic(消费者账号必须执行)
bin/kafka-acls.sh —authorizer kafka.security.auth.SimpleAclAuthorizer —authorizer-properties zookeeper.connect=172.17.20.50:2181 —add —allow-principal User:reader —operation Read —topic test
5. 允许账号使用test-group的消费者组 (消费者账号必须执行)
bin/kafka-acls.sh —authorizer kafka.security.auth.SimpleAclAuthorizer —authorizer-properties zookeeper.connect=172.17.20.50:2181 —add —allow-principal User:reader —operation Read —group test-group
6. 消费者配置
- 创建conf/consumer.conf:
security.protocol=SASL_PLAINTEXT sasl.mechanism=SCRAM-SHA-512 sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username=”reader” password=”rd-test”;
<br />**注意:这里的账号密码需要和上面账号管理创建的账号匹配**
- 启动消费者
bin/kafka-console-consumer.sh —bootstrap-server 172.17.20.50:9092 —topic test —from-beginning —consumer.config /usr/local/kafka/conf/consumer.conf —group test-group
<br />**注意:使用的账号必须有读取topic和使用group的权限**
7. 生产者配置
- 创建conf/product.conf:
security.protocol=SASL_PLAINTEXT sasl.mechanism=SCRAM-SHA-512 sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username=”writer” password=”wr-test”;
- 启动生产者:
bin/kafka-console-producer.sh —broker-list 172.17.20.50:9092 —topic test —producer.config /usr/local/kafka/conf/producer.conf
<a name="7a038119"></a>
## Python使用教程
安装kafka-python: `pip install kafka-python==2.0.2`
Producer.py:
```python
# -*- coding: utf-8 -*-
import time
from kafka import KafkaProducer, KafkaConsumer
producer = KafkaProducer(
sasl_mechanism="SCRAM-SHA-512",
security_protocol='SASL_PLAINTEXT',
sasl_plain_username="writer",
sasl_plain_password="wr-test",
bootstrap_servers=['172.17.20.50:9092']
)
def send_msg():
for i in range(10):
msg = "msg %d" % i
# print(msg)
future = producer.send('test', msg.encode(), key="test".encode())
print(future)
result = future.get(timeout=10)
#time.sleep(5)
print(result)
producer.close()
return "put OK"
if __name__ == '__main__':
res = send_msg()
Consumer.py:
# -*- coding: utf-8 -*-
import sys
from kafka import KafkaProducer, KafkaConsumer
consumer = KafkaConsumer(
'test',
bootstrap_servers=['172.17.20.51:9092'],
sasl_mechanism="SCRAM-SHA-512",
security_protocol='SASL_PLAINTEXT',
sasl_plain_username="xxx",
sasl_plain_password="xxx",
group_id='test-group'
)
def get_msg():
for msg in consumer:
recv = "%s:%d:%d: key=%s value=%s" % (msg.topic, msg.partition, msg.offset, msg.key, msg.value)
print(recv)
return "get_end"
if __name__ == '__main__':
res = get_msg()
print(res)
