kafka zk 使用SASL,PLAIN认证

zookeeper配置

  1. 修改zoo.cfg增加三行配置
  1. authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
  2. requireClientAuthScheme=sasl
  3. jaasLoginRenew=360000
  1. 配置JAAS文件:conf目录下创建zk_server_jaas.conf(定义了需要链接到Zookeeper服务器的用户名和密码)

vim zk_server_jaas.conf

  1. Server {
  2. org.apache.kafka.common.security.plain.PlainLoginModule required
  3. username="admin"
  4. password="admin-sec"
  5. user_admin="admin-sec";
  6. };
  1. 加入需要的包:(从kafka/libs下拷贝到zookeeper/lib)
  1. kafka-clients-0.10.0.1.jar
  2. lz4-1.3.0.jar
  3. slf4j-api-1.7.21.jar
  4. slf4j-log4j12-1.7.21.jar
  5. snappy-java-1.1.2.6.jar
  1. 修改zkEnv.sh(在最后一行新增)
  1. export SERVER_JVMFLAGS=" -Djava.security.auth.login.config=$ZOOKEEPER_PREFIX/conf/zk_server_jaas.conf"
  1. 启动Zookeeper
  1. /opt/lingmou/zookeeper/bin/zkServer.sh start

kafka服务的配置

  1. 配置server.properties
  1. listeners=SASL_PLAINTEXT://192.168.101.130:6667
  2. advertised.listeners=SASL_PLAINTEXT://192.168.101.130:6667
  3. # set protocol
  4. security.inter.broker.protocol=SASL_PLAINTEXT
  5. # SASL机制
  6. sasl.enabled.mechanisms=PLAIN
  7. sasl.mechanism.inter.broker.protocol=PLAIN
  8. # 完成身份认证的类
  9. authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
  10. # 如果没有找到ACL(访问控制列表)配置,则允许任何操作
  11. allow.everyone.if.no.acl.found=true
  12. super.users=User:admin
  13. security.protocol=SASL_PLAINTEXT
  1. kafka增加认证信息:config/kafka_server_jaas.conf
  1. KafkaServer {
  2. org.apache.kafka.common.security.plain.PlainLoginModule required
  3. username="admin" # username是broker用于初始化连接到其他的broker
  4. password="admin-sec"
  5. user_admin="admin-sec" # user_userName定义了所有连接到broker和broker验证的所有的客户端连接包括其他broker的用户密码
  6. user_producer="prod-sec"
  7. user_consumer="cons-sec";
  8. };
  9. Client {
  10. org.apache.kafka.common.security.plain.PlainLoginModule required
  11. username="admin" # 就是broker与zookeeper通信时登录zookeeper的用户名密码
  12. password="admin-sec";
  13. };
  1. 修改启动脚本 bin/kafka-server-start.sh
  1. exec $base_dir/kafka-run-class.sh $EXTRA_ARGS -Djava.security.auth.login.config=$base_dir/../config/kafka_server_jaas.conf kafka.Kafka "$@"
  1. 启动kafka
  1. /opt/lingmou/kafka/bin/kafka-server-start.sh -daemon /opt/lingmou/kafka/config/server.properties
  1. kafka客户端配置
  1. # 消费者:conf/kafka-consumer-jaas.conf
  2. KafkaClient {
  3. org.apache.kafka.common.security.plain.PlainLoginModule required
  4. username="consumer"
  5. password="cons-sec";
  6. };
  7. Client {
  8. org.apache.kafka.common.security.plain.PlainLoginModule required
  9. username="consumer"
  10. password="cons-sec";
  11. };
  12. # 生产者:conf/kafka-producer-jaas.conf
  13. KafkaClient {
  14. org.apache.kafka.common.security.plain.PlainLoginModule required
  15. username="producer"
  16. password="prod-sec";
  17. };
  18. Client {
  19. org.apache.kafka.common.security.plain.PlainLoginModule required
  20. username="producer"
  21. password="prod-sec";
  22. };
  1. 修改客户端配置信息
  1. # 消费者:config/producer.properties
  2. bootstrap.servers=192.168.101.130:6667
  3. compression.type=none
  4. # 添加认证机制
  5. security.protocol=SASL_PLAINTEXT
  6. sasl.mechanism=PLAIN
  7. # 生产者:config/consumer.properties
  8. zookeeper.connect=192.168.101.130:2181
  9. zookeeper.connection.timeout.ms=6000
  10. group.id=test-group
  11. # 添加认证机制
  12. sasl.mechanism=PLAIN
  13. security.protocol=SASL_PLAINTEXT
  1. 修改客户端脚本指定JAAS文件加载:
  1. # 消费者 bin/kafka-console-consumer.sh
  2. 修改
  3. exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleProducer "$@"
  4. exec $(dirname $0)/kafka-run-class.sh -Djava.security.auth.login.config=$(dirname $0)/../config/kafka-consumer-jaas.conf kafka.tools.ConsoleConsumer "$@"
  5. # 生产者 bin/kafka-console-producer.sh
  6. 修改
  7. exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleConsumer "$@"
  8. exec $(dirname $0)/kafka-run-class.sh -Djava.security.auth.login.config=$(dirname $0)/../config/kafka-producer-jaas.conf kafka.tools.ConsoleProducer "$@"
  1. 进行授权
  1. 1)创建主题 haha
  2. ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic haha
  3. 2)增加生产权限
  4. ./bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:producer --operation Write --topic haha
  5. 3)配置消费权限
  6. ./bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:consumer --operation Read --topic haha
  7. 4)配置消费分组权限
  8. ./bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:consumer --operation Read --group test-group
  9. 5)查看配置的权限
  10. ./bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:2181 --list
  11. 6)取消权限
  12. ./bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:2181 --remove --allow-principal User:producer --operation Write --topic haha
  13. 7) 拒绝来自 ip 192.168.1.100账户为 zhangsan 进行 read 操作,其他用户都允许
  14. ./bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:* --allow-host * --deny-principal User:zhangsan --deny-host 192.168.1.100 --operation Read --topic haha
  15. 8) 允许来自 ip 192.168.1.100或者192.168.1.101的读写请求
  16. ./bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:zhangsan --allow-principal User:alice --allow-host 192.168.1.100 --allow-host 192.168.1.101 --operation Read --operation Write --topic haha
  1. 测试
  1. 1)生产数据
  2. ./bin/kafka-console-producer.sh --topic haha --broker-list localhost:6667 --producer.config config/producer.properties
  3. 2)消费数据
  4. ./bin/kafka-console-consumer.sh --topic haha --bootstrap-server localhost:6667 --consumer.config config/consumer.properties