Kafka三种认证模式 使用kerberos认证 bootstrap.servers=hadoop01.com:9092,hadoop02.com:9092,hadoop03.com:9092,hadoop04.com:9092 security.protocol=SASL_PLAINTEXT sasl.kerberos.service.name=hadoop # 使用本地缓存 sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required useTicketCache=true;. # 使用keytab模式 #sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storekey=true keyTab="/var/lib/key/hadoop.keytab" principal="kafka@HADOOP.COM";
使用PLAIN认证模式 bootstrap.servers=hadoop01.com:9092,hadoop02.com:9092,hadoop03.com:9092,hadoop04.com:9092 security.protocol=SASL_PLAINTEXT sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="kafka" password="kafka";
使用SSL安全模式 bootstrap.servers=hadoop01.com:9091,hadoop02.com:9091,hadoop03.com:9091,hadoop04.com:9091 security.protocol=SSL ssl.truststore.location=/var/lib/key/hadoop.keystore ssl.truststore.password=hadoop ssl.keystore.location=/var/lib/key/hadoop.keystore ssl.keystore.password=hadoop ssl.key.password=hadoop ssl.enabled.protocols=TLsv1.2,TLSv1.1,TLSv1 ssl.truststore.type=JKS
一般来说比较安全的认证方式采用Kerberos是比较合适的,不过Kerberos的门槛相对较高,所以采用类似用户名+密码的方式也是很常见的,本文讲述一下使用PLAIN也就是用户名+密码认证kafka的流程。
因为Kafka与ZK有依赖关系,因此也需要在ZK中配置相关的认证信息。
Zookeeper 配置 SASL 新建 zoo_jaas.conf 文件 zoo_jaas.conf文件名、文件所在路径没有特殊要求,一般放置在${ZOOKEEPER_HOME}/conf目录下,内容如下:
Server { org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin" user_kafka="kafka@123"; };
Server.username、Server.password为 Zookeeper 内部通信的用户名和密码,因此保证每个 zk 节点该属性一致即可。 Server.user_xxx 中 xxx 为自定义用户名,用于 zkClient 连接所使用的用户名和密码,即为 kafka 创建的用户名。
配置 zoo.conf 文件 authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider requireClientAuthScheme=sasl jaasLoginRenew=3600000 zookeeper.sasl.client=true
zookeeper.sasl.client设置为true,开启客户端身份验证,否则zoo_jaas.conf中配置的用户名将不起作用,客户端仍然可以无 jaas 文件连接,只是带有 WARNNING 而已。
导入依赖包 因为使用的权限验证类为:org.apache.kafka.common.security.plain.PlainLoginModule,所以需要 kafka 相关 jar 包,新建文件夹 zk_sasl_lib,如下:
kafka-clients-2.4.1.jar lz4-java-1.6.0.jar slf4j-api-1.7.28.jar slf4j-log4j12-1.7.28.jar snappy-java-1.1.7.3.jar
修改 zkEnv.sh 文件 从:
export SERVER_JVMFLAGS="-Xmx${ZK_SERVER_HEAP}m $SERVER_JVMFLAGS"
修改为:
for jar in /Users/wjun/env/zookeeper/zk_sasl_lib/*.jar; do CLASSPATH="$jar:$CLASSPATH" done export SERVER_JVMFLAGS="-Xmx${ZK_SERVER_HEAP}m $SERVER_JVMFLAGS -Djava.security.auth.login.config=/Users/wjun/env/zookeeper/conf/zoo_jaas.conf"
重启 Zookeeper 服务即可。
Kakfa 配置 SASL 新建 kafka_server_jaas.conf 文件 kafka_server_jaas.conf文件名和存放路径没有要求,一般放置在${KAFKA_HOME}/config目录下:
KafkaServer { org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin@123" user_admin="admin@123" user_producer="producer@123" user_consumer="consumer@123"; }; Client{ org.apache.kafka.common.security.plain.PlainLoginModule required username="kafka" password="kafka@123"; };
KafkaServer.username、KafkaServer.password 为 broker 内部通信的用户名密码,同上。 KafkaServer.user_xxx 其中 xxx 必须和 KafkaServer.username 配置的用户名一致,密码也一致。 KafkaServer.user_producer、KafkaServer.user_consumer 为了之后的 ACL 做准备,达到消费者生产者使用不同账号且消费者账号只能消费数据,生产者账号只能生产数据。 Client.username、Client.password 填写 Zookeeper 中注册的账号密码,用于 broker 与 zk 的通信(若 zk 没有配置 SASL 可以忽略、若 zookeeper.sasl.client 为 false 也可以忽略只是带有⚠️,日志如下):
WARN SASL configuration failed: javax.security.auth.login.LoginException: No JAAS configuration section named 'Client' was found in specified JAAS configuration file: 'kafka_server_jaas.conf'. Will continue connection to Zookeeper server without SASL authentication, if Zookeeper server allows it. (org.apache.zookeeper.ClientCnxn)
修改 server.properties 文件 listeners=SASL_PLAINTEXT://localhost:9092 security.inter.broker.protocol=SASL_PLAINTEXT sasl.mechanism.inter.broker.protocol=PLAIN sasl.enabled.mechanisms=PLAIN allow.everyone.if.no.acl.found=false authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer super.users=User:admin
super.users 配置超级用户,该用户不受之后的 ACL 配置影响
修改启动脚本 修改 kafka-server-start.sh 文件,使之加载到 kafka_server_jaas.conf 文件,从:
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G" fi
修改为:
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G -Djava.security.auth.login.config=/Users/wjun/env/kafka/config/kafka_server_jaas.conf" fi
重启 kafka 服务即可。
Java API 验证 public class TProducer { public static void main(String[] args) throws IOException { // 创建配置类 Properties properties = new Properties(); // 加载生产者配置文件 properties.load(TProducer.class.getClassLoader().getResourceAsStream("producer.properties")); // 创建生产者对象 KafkaProducer<String, String> producer = new KafkaProducer<>(properties); ProducerRecord<String, String> producerRecord = new ProducerRecord<>("demo1", "key_1", "value_1"); producer.send(producerRecord, (metadata, exception) -> { if (exception == null) { System.out.println("消息发送至 --> " + metadata.topic() + " 偏移量为:" + metadata.offset()); } else { System.out.println("消息发送失败 " + exception.getMessage()); } }); producer.close(); } }
需要在producer.properties 中新增:
security.protocol=SASL_PLAINTEXT sasl.mechanism=PLAIN
security.protocol=SASL_PLAINTEXT、sasl.mechanism=PLAIN 必须配置。
配置密码 前面的配置只是提示了client需要使用PLAIN的认证,但是没有指明认证的账号,对于账号的配置有2中方式。
使用文件配置 创建 kafka_client_jaas.conf 文件:
KafkaClient{ org.apache.kafka.common.security.plain.PlainLoginModule required username="producer" password="producer@123"; };
程序启动时添加参数:
-Djava.security.auth.login.config=kafka_client_jaas.conf
修改配置项 在 producer.properties 添加:
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="producer" password="producer@123";
启动程序成功生产数据:
[main] INFO org.apache.kafka.common.security.authenticator.AbstractLogin - Successfully logged in. [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 2.4.1 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: c57222ae8cd7866b [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1624965871345 [kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.Metadata - [Producer clientId=producer-1] Cluster ID: l3Agv3weRiG27uo5EDj4KA [main] INFO org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms. 消息发送至 --> demo1 偏移量为:18
消费者同理:
public class TConsumer { public static void main(String[] args) throws IOException { // 创建配置类 Properties properties = new Properties(); // 加载生产者配置文件 properties.load(TProducer.class.getClassLoader().getResourceAsStream("consumer.properties")); // 构建消费者 KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties); // 订阅主题 consumer.subscribe(Collections.singletonList("demo1")); ConsumerRecords<String, String> records; while (true) { records = consumer.poll(Duration.ofSeconds(1)); for (ConsumerRecord<String, String> record : records) { System.out.println(record.key() + "--" + record.value()); } } } }
不过消费的时候会有权限问题,也就是需要给用户进行授权,否则会有如下错误:
Exception in thread "main" org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [demo1]
Kafka ACL 配置用户具有某个主题的写权限,即生产数据:
kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:producer --producer --topic demo1
配置用户具有某个主题的读权限,即消费数据:
kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:consumer --consumer --topic demo1 --group test-consumer-group
查看 ACL 列表:
kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --list --topic demo1
同时也可以取消用户权限,还可以限制 ip 等,具体 ACL操作见 kafka官方文档。
扫码手机观看或分享: