Kafka 目前支持SSL、SASL/Kerberos、SASL/PLAIN三种认证机制

kafka的认证范围

kafka client 与 kafka server(broker)
broker与broker之间
broker与zookeeper之间

zookpeer认证

在zookeeper安装根目录的conf目录下,创建zk_server_jaas.conf文件

  1. Server {
  2. org.apache.kafka.common.security.plain.PlainLoginModule required
  3. username="admin"
  4. password="123456"
  5. user_admin="123456"
  6. user_zk001="123456"
  7. user_zk002="123456"
  8. user_zk003="123456"
  9. user_zk004="123456"
  10. user_zk005="123456";
  11. };#其中usernamepassword用于brokerszk进行认证,user_*用于zk clientzk server进行认证#user_zk001="123456" 表示 zk001为用户名,123456为密码

zoo.cfg添加以下配置

  1. authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
  2. requireClientAuthScheme=sasl
  3. jaasLoginRenew=3600000

将kafka libs目录下的jar包拷贝到zookeeper lib目录:
因为认证的时候用到包org.apache.kafka.common.security.plain.PlainLoginModule, 这个是kafka-client.jar里面,所有需要将相应的jar拷贝到zookeeper安装根目录的lib目录下, 大概要copy这些jar

  1. kafka-clients-2.1.1.jar
  2. lz4-java-1.5.0.jar
  3. osgi-resource-locator-1.0.1.jar
  4. slf4j-api-1.7.25.jar
  5. snappy-java-1.1.7.2.jar

修改zk的启动参数, 修改bin/zkEnv.sh, 在文件尾加上

  1. SERVER_JVMFLAGS=" -Djava.security.auth.login.config=$ZOOCFGDIR/zk_server_jaas.conf"

kafka broker的认证配置

config目录下, 创建kafka_server_jaas.conf

  1. KafkaServer {
  2. org.apache.kafka.common.security.plain.PlainLoginModule required
  3. username="admin"
  4. password="123456"
  5. user_admin="123456"
  6. user_alice="123456"
  7. user_write="123456"
  8. user_read="123456"
  9. user_kafka001="123456"
  10. user_kafka002="123456"
  11. user_kafka003="123456"
  12. user_kafka004="123456"
  13. user_kafka005="123456";
  14. };
  15. Client {
  16. org.apache.kafka.common.security.plain.PlainLoginModule required
  17. username="admin"
  18. password="123456";
  19. };

修改 config/server.properties

  1. listeners=SASL_PLAINTEXT://192.168.180.128:8123
  2. advertised.listeners=SASL_PLAINTEXT://192.168.180.128:8123
  3. security.inter.broker.protocol=SASL_PLAINTEXT
  4. sasl.enabled.mechanisms=PLAIN
  5. sasl.mechanism.inter.broker.protocol=PLAIN
  6. #allow.everyone.if.no.acl.found=true
  7. super.users=User:admin
  8. authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
  9. zookeeper.set.acl=true
  10. #listeners,用于server真正bind
  11. #advertised.listeners,用于开发给用户,如果没有设定,直接使用listeners

修改bin/kafka-server-start.sh

  1. if [ "x$KAFKA_OPTS" ]; then
  2. export KAFKA_OPTS="-Djava.security.auth.login.config=/export/ap-comm/server/kafka_2.11-2.1.1/config/kafka_server_jaas.conf"
  3. fi

以上认证部分已经配置完成

验证

启动kafka

  1. ../bin/kafka-server-start.sh server.properties

config目录下创建zk_client_jaas.conf

  1. Client {
  2. org.apache.kafka.common.security.plain.PlainLoginModule required
  3. username="admin"
  4. password="123456";
  5. };

修改kafka-topics.sh 添加配置

  1. if [ "x$KAFKA_OPTS" ]; then
  2. export KAFKA_OPTS="-Djava.security.auth.login.config=/export/ap-comm/server/kafka_2.11-2.1.1/config/zk_client_jaas.conf"
  3. fi

创建topic

  1. ../bin/kafka-topics.sh --create --zookeeper localhost:2189 --replication-factor 1 --partitions 1 --topic test015

修改bin/kafka-acls.sh 添加以下配置

  1. if [ "x$KAFKA_OPTS" ]; then
  2. export KAFKA_OPTS="-Djava.security.auth.login.config=/export/ap-comm/server/kafka_2.11-2.1.1/config/zk_client_jaas.conf"
  3. fi

write read用户赋权

  1. ../bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:2189 --add --allow-principal User:write --operation Write --topic test015
  2. ../bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:2189 --add --allow-principal User:read --operation Read --group test-group --topic test015

查看所有权限

  1. ../bin/kafka-acls.sh --list --authorizer-properties zookeeper.connect=localhost:2189

kafka client的认证配置
config/下创建kafka_write_jaas.conf

  1. KafkaClient {
  2. org.apache.kafka.common.security.plain.PlainLoginModule required
  3. username="write"
  4. password="123456";
  5. };

修改bin/kafka-console-producer.sh 添加以下配置

  1. if [ "x$KAFKA_OPTS" ]; then
  2. export KAFKA_OPTS="-Djava.security.auth.login.config=/export/ap-comm/server/kafka_2.12-1.1.1/config/kafka_write_jaas.conf"
  3. fi

config/下创建producer.config

  1. bootstrap.servers=192.168.180.128:8123
  2. compression.type=none
  3. security.protocol=SASL_PLAINTEXT
  4. sasl.mechanism=PLAIN

producer启动测试

  1. ../bin/kafka-console-producer.sh --broker-list 192.168.180.128:8123 --topic test015 --producer.config producer.config

config/下创建kafka_read_jaas.conf

  1. KafkaClient {
  2. org.apache.kafka.common.security.plain.PlainLoginModule required
  3. username="read"
  4. password="123456";
  5. };

修改bin/kafka-console-consumer.sh

  1. if [ "x$KAFKA_OPTS" ]; then
  2. export KAFKA_OPTS="-Djava.security.auth.login.config=/export/ap-comm/server/kafka_2.11-2.1.1/config/kafka_read_jaas.conf"
  3. fi

config/下创建consumer.config

  1. security.protocol=SASL_PLAINTEXT
  2. sasl.mechanism=PLAIN
  3. group.id=test-group

consumer启动测试

  1. ../bin/kafka-console-consumer.sh --bootstrap-server 192.168.180.128:8123 --topic test015 --from-beginning --consumer.config consumer.config

Java客户端认证

mvn依赖

  1. <dependency>
  2. <groupId>org.apache.kafka</groupId>
  3. <artifactId>kafka_2.12</artifactId>
  4. <version>0.11.0.1</version>
  5. </dependency>

生产数据:KafkaProducerSasl.java:

  1. package kafka;
  2. import org.apache.kafka.clients.producer.KafkaProducer;
  3. import org.apache.kafka.clients.producer.Producer;
  4. import org.apache.kafka.clients.producer.ProducerRecord;
  5. import java.util.Properties;
  6. public class KafkaProducerSasl {
  7. public final static String TOPIC = "test010";
  8. private static void producer() throws InterruptedException {
  9. System.setProperty("java.security.auth.login.config", "E:/work/saslconf/kafka_write_jaas.conf");
  10. Properties props = new Properties();
  11. props.put("bootstrap.servers", "192.168.180.128:8123");
  12. props.put("acks", "all");
  13. props.put("retries", 0);
  14. props.put("batch.size", 16384);
  15. props.put("linger.ms", 1);
  16. props.put("buffer.memory", 33554432);
  17. props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  18. props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  19. props.put("security.protocol", "SASL_PLAINTEXT");
  20. props.put("sasl.mechanism", "PLAIN");
  21. Producer<String, String> producer = new KafkaProducer<>(props);
  22. while (true){
  23. long startTime = System.currentTimeMillis();
  24. for (int i = 0; i < 100; i++) {
  25. producer.send(new ProducerRecord<>(TOPIC, Integer.toString(i), Integer.toString(i)));
  26. }
  27. System.out.println(System.currentTimeMillis()-startTime);
  28. Thread.sleep(5000);
  29. }
  30. }
  31. public static void main(String[] args) {
  32. try {
  33. producer();
  34. } catch (InterruptedException e) {
  35. e.printStackTrace();
  36. }
  37. }
  38. }


消费数据:KafkaConsumerSasl.java:

  1. package kafka;
  2. import org.apache.kafka.clients.consumer.ConsumerRecord;
  3. import org.apache.kafka.clients.consumer.ConsumerRecords;
  4. import org.apache.kafka.clients.consumer.KafkaConsumer;
  5. import java.util.Collections;
  6. import java.util.Properties;
  7. public class KafkaConsumerSasl {
  8. public static void consumer() throws Exception {
  9. System.setProperty("java.security.auth.login.config", "E:/work/saslconf/kafka_read_jaas.conf");
  10. Properties props = new Properties();
  11. props.put("bootstrap.servers", "192.168.180.128:8123");
  12. props.put("enable.auto.commit", "true");
  13. props.put("auto.commit.interval.ms", "1000");
  14. props.put("group.id", "test-group");
  15. props.put("session.timeout.ms", "6000");
  16. props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  17. props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  18. props.put("security.protocol", "SASL_PLAINTEXT");
  19. props.put("sasl.mechanism", "PLAIN");
  20. KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
  21. consumer.subscribe(Collections.singletonList("test010"));
  22. while (true) {
  23. long startTime = System.currentTimeMillis();
  24. ConsumerRecords<String, String> records = consumer.poll(1000);
  25. System.out.println(System.currentTimeMillis() - startTime);
  26. System.out.println("recieve message number is " + records.count());
  27. for (ConsumerRecord<String, String> record : records) {
  28. System.out.printf("offset = %d, key = %s, value = %s, partition = %d %n",
  29. record.offset(),
  30. record.key(),
  31. record.value(),
  32. record.partition());
  33. }
  34. }
  35. }
  36. //http://www.open-open.com/lib/view/open1412991579999.html
  37. public static void main(String[] args) throws Exception {
  38. consumer();
  39. }
  40. }

更多:
kafka 实战SSL:https://www.orchome.com/1959
kafka 实战SASL/PLAIN认证:https://www.orchome.com/1960
kafka 实战SASL/SSL认证:https://www.orchome.com/10610
kafka 实战SASL/SCRAM:https://www.orchome.com/1966
kafka外网转发:https://www.orchome.com/1903