Kafka 目前支持SSL、SASL/Kerberos、SASL/PLAIN三种认证机制
kafka的认证范围
kafka client 与 kafka server(broker)
broker与broker之间
broker与zookeeper之间
zookpeer认证
在zookeeper安装根目录的conf目录下,创建zk_server_jaas.conf文件
Server {org.apache.kafka.common.security.plain.PlainLoginModule requiredusername="admin"password="123456"user_admin="123456"user_zk001="123456"user_zk002="123456"user_zk003="123456"user_zk004="123456"user_zk005="123456";};#其中username和password用于brokers和zk进行认证,user_*用于zk client与zk server进行认证#user_zk001="123456" 表示 zk001为用户名,123456为密码
zoo.cfg添加以下配置
authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProviderrequireClientAuthScheme=sasljaasLoginRenew=3600000
将kafka libs目录下的jar包拷贝到zookeeper lib目录:
因为认证的时候用到包org.apache.kafka.common.security.plain.PlainLoginModule, 这个是kafka-client.jar里面,所有需要将相应的jar拷贝到zookeeper安装根目录的lib目录下, 大概要copy这些jar
kafka-clients-2.1.1.jarlz4-java-1.5.0.jarosgi-resource-locator-1.0.1.jarslf4j-api-1.7.25.jarsnappy-java-1.1.7.2.jar
修改zk的启动参数, 修改bin/zkEnv.sh, 在文件尾加上
SERVER_JVMFLAGS=" -Djava.security.auth.login.config=$ZOOCFGDIR/zk_server_jaas.conf"
kafka broker的认证配置
config目录下, 创建kafka_server_jaas.conf
KafkaServer {org.apache.kafka.common.security.plain.PlainLoginModule requiredusername="admin"password="123456"user_admin="123456"user_alice="123456"user_write="123456"user_read="123456"user_kafka001="123456"user_kafka002="123456"user_kafka003="123456"user_kafka004="123456"user_kafka005="123456";};Client {org.apache.kafka.common.security.plain.PlainLoginModule requiredusername="admin"password="123456";};
修改 config/server.properties
listeners=SASL_PLAINTEXT://192.168.180.128:8123advertised.listeners=SASL_PLAINTEXT://192.168.180.128:8123security.inter.broker.protocol=SASL_PLAINTEXTsasl.enabled.mechanisms=PLAINsasl.mechanism.inter.broker.protocol=PLAIN#allow.everyone.if.no.acl.found=truesuper.users=User:adminauthorizer.class.name=kafka.security.auth.SimpleAclAuthorizerzookeeper.set.acl=true#listeners,用于server真正bind#advertised.listeners,用于开发给用户,如果没有设定,直接使用listeners
修改bin/kafka-server-start.sh
if [ "x$KAFKA_OPTS" ]; thenexport KAFKA_OPTS="-Djava.security.auth.login.config=/export/ap-comm/server/kafka_2.11-2.1.1/config/kafka_server_jaas.conf"fi
验证
启动kafka
../bin/kafka-server-start.sh server.properties
config目录下创建zk_client_jaas.conf
Client {org.apache.kafka.common.security.plain.PlainLoginModule requiredusername="admin"password="123456";};
修改kafka-topics.sh 添加配置
if [ "x$KAFKA_OPTS" ]; thenexport KAFKA_OPTS="-Djava.security.auth.login.config=/export/ap-comm/server/kafka_2.11-2.1.1/config/zk_client_jaas.conf"fi
创建topic
../bin/kafka-topics.sh --create --zookeeper localhost:2189 --replication-factor 1 --partitions 1 --topic test015
修改bin/kafka-acls.sh 添加以下配置
if [ "x$KAFKA_OPTS" ]; thenexport KAFKA_OPTS="-Djava.security.auth.login.config=/export/ap-comm/server/kafka_2.11-2.1.1/config/zk_client_jaas.conf"fi
write read用户赋权
../bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:2189 --add --allow-principal User:write --operation Write --topic test015../bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:2189 --add --allow-principal User:read --operation Read --group test-group --topic test015
查看所有权限
../bin/kafka-acls.sh --list --authorizer-properties zookeeper.connect=localhost:2189
kafka client的认证配置
config/下创建kafka_write_jaas.conf
KafkaClient {org.apache.kafka.common.security.plain.PlainLoginModule requiredusername="write"password="123456";};
修改bin/kafka-console-producer.sh 添加以下配置
if [ "x$KAFKA_OPTS" ]; thenexport KAFKA_OPTS="-Djava.security.auth.login.config=/export/ap-comm/server/kafka_2.12-1.1.1/config/kafka_write_jaas.conf"fi
config/下创建producer.config
bootstrap.servers=192.168.180.128:8123compression.type=nonesecurity.protocol=SASL_PLAINTEXTsasl.mechanism=PLAIN
producer启动测试
../bin/kafka-console-producer.sh --broker-list 192.168.180.128:8123 --topic test015 --producer.config producer.config
config/下创建kafka_read_jaas.conf
KafkaClient {org.apache.kafka.common.security.plain.PlainLoginModule requiredusername="read"password="123456";};
修改bin/kafka-console-consumer.sh
if [ "x$KAFKA_OPTS" ]; thenexport KAFKA_OPTS="-Djava.security.auth.login.config=/export/ap-comm/server/kafka_2.11-2.1.1/config/kafka_read_jaas.conf"fi
config/下创建consumer.config
security.protocol=SASL_PLAINTEXTsasl.mechanism=PLAINgroup.id=test-group
consumer启动测试
../bin/kafka-console-consumer.sh --bootstrap-server 192.168.180.128:8123 --topic test015 --from-beginning --consumer.config consumer.config
Java客户端认证
mvn依赖
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka_2.12</artifactId><version>0.11.0.1</version></dependency>
生产数据:KafkaProducerSasl.java:
package kafka;import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.Producer;import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;public class KafkaProducerSasl {public final static String TOPIC = "test010";private static void producer() throws InterruptedException {System.setProperty("java.security.auth.login.config", "E:/work/saslconf/kafka_write_jaas.conf");Properties props = new Properties();props.put("bootstrap.servers", "192.168.180.128:8123");props.put("acks", "all");props.put("retries", 0);props.put("batch.size", 16384);props.put("linger.ms", 1);props.put("buffer.memory", 33554432);props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("security.protocol", "SASL_PLAINTEXT");props.put("sasl.mechanism", "PLAIN");Producer<String, String> producer = new KafkaProducer<>(props);while (true){long startTime = System.currentTimeMillis();for (int i = 0; i < 100; i++) {producer.send(new ProducerRecord<>(TOPIC, Integer.toString(i), Integer.toString(i)));}System.out.println(System.currentTimeMillis()-startTime);Thread.sleep(5000);}}public static void main(String[] args) {try {producer();} catch (InterruptedException e) {e.printStackTrace();}}}
消费数据:KafkaConsumerSasl.java:
package kafka;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;import java.util.Collections;import java.util.Properties;public class KafkaConsumerSasl {public static void consumer() throws Exception {System.setProperty("java.security.auth.login.config", "E:/work/saslconf/kafka_read_jaas.conf");Properties props = new Properties();props.put("bootstrap.servers", "192.168.180.128:8123");props.put("enable.auto.commit", "true");props.put("auto.commit.interval.ms", "1000");props.put("group.id", "test-group");props.put("session.timeout.ms", "6000");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("security.protocol", "SASL_PLAINTEXT");props.put("sasl.mechanism", "PLAIN");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Collections.singletonList("test010"));while (true) {long startTime = System.currentTimeMillis();ConsumerRecords<String, String> records = consumer.poll(1000);System.out.println(System.currentTimeMillis() - startTime);System.out.println("recieve message number is " + records.count());for (ConsumerRecord<String, String> record : records) {System.out.printf("offset = %d, key = %s, value = %s, partition = %d %n",record.offset(),record.key(),record.value(),record.partition());}}}//http://www.open-open.com/lib/view/open1412991579999.htmlpublic static void main(String[] args) throws Exception {consumer();}}
更多:
kafka 实战SSL:https://www.orchome.com/1959
kafka 实战SASL/PLAIN认证:https://www.orchome.com/1960
kafka 实战SASL/SSL认证:https://www.orchome.com/10610
kafka 实战SASL/SCRAM:https://www.orchome.com/1966
kafka外网转发:https://www.orchome.com/1903
