kafka zk 使用SASL,PLAIN认证
zookeeper配置
- 修改zoo.cfg增加三行配置
authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
requireClientAuthScheme=sasl
jaasLoginRenew=360000
- 配置JAAS文件:conf目录下创建zk_server_jaas.conf(定义了需要链接到Zookeeper服务器的用户名和密码)
vim zk_server_jaas.conf
Server {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin-sec"
user_admin="admin-sec";
};
- 加入需要的包:(从kafka/libs下拷贝到zookeeper/lib)
kafka-clients-0.10.0.1.jar
lz4-1.3.0.jar
slf4j-api-1.7.21.jar
slf4j-log4j12-1.7.21.jar
snappy-java-1.1.2.6.jar
- 修改zkEnv.sh(在最后一行新增)
export SERVER_JVMFLAGS=" -Djava.security.auth.login.config=$ZOOKEEPER_PREFIX/conf/zk_server_jaas.conf"
- 启动Zookeeper
/opt/lingmou/zookeeper/bin/zkServer.sh start
kafka服务的配置
- 配置server.properties
listeners=SASL_PLAINTEXT://192.168.101.130:6667
advertised.listeners=SASL_PLAINTEXT://192.168.101.130:6667
# set protocol
security.inter.broker.protocol=SASL_PLAINTEXT
# SASL机制
sasl.enabled.mechanisms=PLAIN
sasl.mechanism.inter.broker.protocol=PLAIN
# 完成身份认证的类
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
# 如果没有找到ACL(访问控制列表)配置,则允许任何操作
allow.everyone.if.no.acl.found=true
super.users=User:admin
security.protocol=SASL_PLAINTEXT
- kafka增加认证信息:config/kafka_server_jaas.conf
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin" # username是broker用于初始化连接到其他的broker
password="admin-sec"
user_admin="admin-sec" # user_userName定义了所有连接到broker和broker验证的所有的客户端连接包括其他broker的用户密码
user_producer="prod-sec"
user_consumer="cons-sec";
};
Client {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin" # 就是broker与zookeeper通信时登录zookeeper的用户名密码
password="admin-sec";
};
- 修改启动脚本 bin/kafka-server-start.sh
exec $base_dir/kafka-run-class.sh $EXTRA_ARGS -Djava.security.auth.login.config=$base_dir/../config/kafka_server_jaas.conf kafka.Kafka "$@"
- 启动kafka
/opt/lingmou/kafka/bin/kafka-server-start.sh -daemon /opt/lingmou/kafka/config/server.properties
- kafka客户端配置
# 消费者:conf/kafka-consumer-jaas.conf
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="consumer"
password="cons-sec";
};
Client {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="consumer"
password="cons-sec";
};
# 生产者:conf/kafka-producer-jaas.conf
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="producer"
password="prod-sec";
};
Client {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="producer"
password="prod-sec";
};
- 修改客户端配置信息
# 消费者:config/producer.properties
bootstrap.servers=192.168.101.130:6667
compression.type=none
# 添加认证机制
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
# 生产者:config/consumer.properties
zookeeper.connect=192.168.101.130:2181
zookeeper.connection.timeout.ms=6000
group.id=test-group
# 添加认证机制
sasl.mechanism=PLAIN
security.protocol=SASL_PLAINTEXT
- 修改客户端脚本指定JAAS文件加载:
# 消费者 bin/kafka-console-consumer.sh
修改
exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleProducer "$@"
为
exec $(dirname $0)/kafka-run-class.sh -Djava.security.auth.login.config=$(dirname $0)/../config/kafka-consumer-jaas.conf kafka.tools.ConsoleConsumer "$@"
# 生产者 bin/kafka-console-producer.sh
修改
exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleConsumer "$@"
为
exec $(dirname $0)/kafka-run-class.sh -Djava.security.auth.login.config=$(dirname $0)/../config/kafka-producer-jaas.conf kafka.tools.ConsoleProducer "$@"
- 进行授权
1)创建主题 haha
./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic haha
2)增加生产权限
./bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:producer --operation Write --topic haha
3)配置消费权限
./bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:consumer --operation Read --topic haha
4)配置消费分组权限
./bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:consumer --operation Read --group test-group
5)查看配置的权限
./bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:2181 --list
6)取消权限
./bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:2181 --remove --allow-principal User:producer --operation Write --topic haha
7) 拒绝来自 ip 为192.168.1.100账户为 zhangsan 进行 read 操作,其他用户都允许
./bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:* --allow-host * --deny-principal User:zhangsan --deny-host 192.168.1.100 --operation Read --topic haha
8) 允许来自 ip 为192.168.1.100或者192.168.1.101的读写请求
./bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:zhangsan --allow-principal User:alice --allow-host 192.168.1.100 --allow-host 192.168.1.101 --operation Read --operation Write --topic haha
- 测试
1)生产数据
./bin/kafka-console-producer.sh --topic haha --broker-list localhost:6667 --producer.config config/producer.properties
2)消费数据
./bin/kafka-console-consumer.sh --topic haha --bootstrap-server localhost:6667 --consumer.config config/consumer.properties