分享

Kafka SASL安全认证机制与ACL用户权限控制

 塞北de雪 2024-04-01 发布于江苏

目录

概述说明

一、认证

二、信道加密

三、授权

环境搭建(Kafka安全认证 SASL/PLAINTEXT)

一、Zookeeper集群配置SASL

二、Kafka集群配置SASL

认证使用

一、客户端调用认证

二、Kafka 用户权限控制 ACL

问题整理

一、配置SASL_PLAINTEXT认证和ACLS授权常见问题


概述说明

        自0.9.0.0.版本引入Security之后,Kafka一直在完善security的功能,以提高kafka集群的安全性。当前Kafka security主要包含3大功能:认证(authentication)、信道加密(encryption)和授权(authorization)。

一、认证

1.Kafka SASL的认证范围包含如下:
  • Client与Broker之间
  • Broker与Broker之间
  • Broker与Zookeeper之间
2.在Kafka系统中,SASL三种实现机制:
  • Kerberos(SASL/GSSAPI - starting at version 0.9.0.0)
  • PLAIN(SASL/PLAIN - starting at version 0.10.0.0)
  • SCRAM(SASL/SCRAM-SHA-256 and SASL/SCRAM-SHA-512 - starting at version 0.10.2.0)

        SASL(Simple Authentication and Security Layer)只是一个安全层框架,具体实现是Kerberos、PLAIN或SCRAM。

        对于一些小公司而言,他们的用户系统并不复杂,集群用户不多,而且由于运行在内网环境,SSL加密也不是很必要。SASL/PALIN而言, 运维成本相对较小,适合小公司中的Kafka集群,弊端是不能动态增减用户。
因此一个SASL/PLAINTEXT+ACL的集群环境足以应付一般的使用场景。下面我使用最新的Kafka 2.4.1版本来演示下如何构建支持SASL(PLAINTEXT) + ACL来构建secured Kafka集群

3.kafka不同的认证机制:

二、信道加密

        信道加密就是为client到broker、broker到broker以及工具脚本与broker之间的数据传输配置SSL。SSL(Secure Sockets Layer)在TCP握手环节使用非对称加密,在数据传输环节使用对称加密。因为非对称加密比对称加密耗时,但安全性比较高,因此在此即提高了安全性,也提高了速度。

        在Kafka系统中,SSL协议作为信道加密机制默认是禁止的,如果需要使用,可以手动启动SSL机制。在一般的上产环境中不建议启动SSL,因为使用SSL就不能使用kafka的ZORE-COPY特性,会大大降低消费时的速度。

三、授权

        授权是通过ACL(Access Control Lists)接口命令来完成用户权限控制。

环境搭建(Kafka安全认证 SASL/PLAINTEXT)

操作系统:            CentOS release 6.3 (Final)

Kafka Version:     kafka_2.11-2.4.1

Zookeeper Version:3.4.13

一、Zookeeper集群配置SASL

1、zoo.cfg文件配置

为zookeeper添加SASL支持,在配置文件zoo.cfg添加,内容如下:

  1. authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
  2. requireClientAuthScheme=sasl
  3. jaasLoginRenew=3600000
2.新建zk_server_jaas.conf文件,为Zookeeper添加账号认证信息

这个文件你放在哪里随意,只要后面zkEnv.sh配置正确的路径就好了。我是放在$ZK_HOME/zk_sasl_dependencies 路径下。zk_server_jaas.conf 文件的内容如下:

  1. Server {
  2. org.apache.kafka.common.security.plain.PlainLoginModule required
  3. username="zk_cluster"
  4. password="zk_cluster_passwd"
  5. user_kafka="zk_kafka_client_passwd";
  6. };

username和paasword是zk集群之间认证的用户名密码。
user_kafka=“zk_kafka_client_passwd"定义了一个用户"kafka”,密码是"zk_kafka_client_passwd"
user_{username}=“password”:形式是定义zk集群的客户端的用户名和密码

3.将Kafka认证相关jar包导入到Zookeeper

        Zookeeper的认证机制是使用插件,“org.apache.kafka.common.security.plain.PlainLoginModule”,所以需要导入Kafka相关jar包,kafka-clients相关jar包,在kafka服务下的lib目录中可以找到,根据kafka不同版本,相关jar包版本会有所变化。

        所需要jar包如下,在zookeeper下创建目录zk_sasl_dependencies将jar包放入(目录名与位置可以随便,后续引用指定即可):

  1. kafka-clients-2.4.1.jar
  2. lz4-java-1.6.0.jar
  3. slf4j-api-1.7.28.jar
  4. slf4j-log4j12-1.7.28.ja
  5. snappy-java-1.1.7.3.jar
4.修改zkEnv.sh

主要目的:

  1. 将这几个jar包使zookeeper读取到
  2. 加载zk_server_jaas.conf配置文件

在 zkEnv.sh 文件最后添加如下内容:

  1. #为zookeeper添加SASL支持
  2. for i in $ZK_HOME/zk_sasl_dependencies/*.jar;
  3. do
  4. CLASSPATH="$i:$CLASSPATH"
  5. done
  6. SERVER_JVMFLAGS=" -Djava.security.auth.login.config=$ZK_HOME/zk_sasl_dependencies/zk_server_jaas.conf "
5.启动Zookeeper服务

在每个ZK节点都要做相同的配置,配置完成启动ZK集群。

我搭建的是伪分布式zk集群,在这里附上我的zoo.cfg 配置:

zoo1.cfg

  1. tickTime=2000
  2. initLimit=10
  3. syncLimit=5
  4. dataDir=/home/hadoop/zk/data/1
  5. dataLogDir=/home/hadoop/zk/logs/1
  6. clientPort=8181
  7. #伪集群部署
  8. server.1=10.194.202.17:2881:3881
  9. server.2=10.194.202.17:2882:3882
  10. server.3=10.194.202.17:2883:3883
  11. #为zookeeper添加SASL支持
  12. authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
  13. requireClientAuthScheme=sasl
  14. jaasLoginRenew=3600000

zoo2.cfg

  1. tickTime=2000
  2. initLimit=10
  3. syncLimit=5
  4. dataDir=/home/hadoop/zk/data/2
  5. dataLogDir=/home/hadoop/zk/logs/2
  6. clientPort=8182
  7. #伪集群部署
  8. server.1=10.194.202.17:2881:3881
  9. server.2=10.194.202.17:2882:3882
  10. server.3=10.194.202.17:2883:3883
  11. #为zookeeper添加SASL支持
  12. authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
  13. requireClientAuthScheme=sasl
  14. jaasLoginRenew=3600000

zoo3.cfg

  1. tickTime=2000
  2. initLimit=10
  3. syncLimit=5
  4. dataDir=/home/hadoop/zk/data/3
  5. dataLogDir=/home/hadoop/zk/logs/3
  6. clientPort=8183
  7. #伪集群部署
  8. server.1=10.194.202.17:2881:3881
  9. server.2=10.194.202.17:2882:3882
  10. server.3=10.194.202.17:2883:3883
  11. #为zookeeper添加SASL支持
  12. authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
  13. requireClientAuthScheme=sasl
  14. jaasLoginRenew=3600000

zk_cluster.sh

  1. #!/usr/bin/env bash
  2. # --------------------------------------------------------------
  3. # Author :lijianjun
  4. # Create Time :2020-04-20 12:56
  5. # Description :zookeeper cluster 管理脚本
  6. # --------------------------------------------------------------
  7. . ~/.bashrc
  8. ZK_HOME="/home/hadoop/zk"
  9. alias zkServer.sh=$ZK_HOME/bin/zkServer.sh
  10. alias zkCli.sh=$ZK_HOME/bin/zkCli.sh
  11. ZOOKEEPERS="10.194.202.17:8181,10.194.202.17:8182,10.194.202.17:8183"
  12. ZK_NODES=3
  13. function start_zk(){
  14. for((i=1;i<=$ZK_NODES;i++))
  15. do
  16. zkServer.sh start $ZK_HOME/conf/zoo$i.cfg
  17. done
  18. }
  19. function stop_zk(){
  20. for((i=1;i<=$ZK_NODES;i++))
  21. do
  22. zkServer.sh stop $ZK_HOME/conf/zoo$i.cfg
  23. done
  24. }
  25. function status_zk(){
  26. for((i=1;i<=$ZK_NODES;i++))
  27. do
  28. zkServer.sh status $ZK_HOME/conf/zoo$i.cfg
  29. done
  30. }
  31. function restart_zk(){
  32. for((i=1;i<=$ZK_NODES;i++))
  33. do
  34. zkServer.sh restart $ZK_HOME/conf/zoo$i.cfg
  35. done
  36. }
  37. function init_clean(){
  38. stop_zk
  39. rm -rf $ZK_HOME/data/*
  40. rm -rf $ZK_HOME/logs/*
  41. for((i=1;i<=$ZK_NODES;i++))
  42. do
  43. mkdir -p $ZK_HOME/data/$i
  44. echo $i > $ZK_HOME/data/$i/myid
  45. done
  46. }
  47. function init_zk(){
  48. init_clean
  49. start_zk
  50. }
  51. function conn_cluster(){
  52. zkCli.sh -server $ZOOKEEPERS
  53. }
  54. case "$1" in
  55. start)
  56. start_zk
  57. ;;
  58. stop)
  59. stop_zk
  60. ;;
  61. status)
  62. status_zk
  63. ;;
  64. restart)
  65. restart_zk
  66. ;;
  67. conn)
  68. conn_cluster
  69. ;;
  70. clean)
  71. init_clean
  72. ;;
  73. init)
  74. init_zk
  75. ;;
  76. *)
  77. echo "Usage: $0 {start|stop|restart|status|init|clean|conn}"
  78. RETVAL=1
  79. esac
  80. exit $RETVAL

二、Kafka集群配置SASL

1.创建kafka_server_jaas.conf文件,该文件名可以自己修改,为kafka添加认证信息

        内容如下(这里的Client与Zookeeper相对应,KafkaServer与后面调用时读取的KafkaClient相对应,是消费生产的账号密码,不要弄混了):

  1. KafkaServer {
  2. org.apache.kafka.common.security.plain.PlainLoginModule required
  3. username="kafkacluster"
  4. password="kafkaclusterpasswd"
  5. user_kafkacluster="kafkaclusterpasswd"
  6. user_admin="adminpasswd"
  7. user_consumer="consumerpasswd"
  8. user_producer="producerpasswd";
  9. };
  10. Client{
  11. org.apache.kafka.common.security.plain.PlainLoginModule required
  12. username="kafka"
  13. password="zk_kafka_client_passwd";
  14. };

  KafkaServer模块,使用user_{name}来定义多个用户,供客户端程序(生产者、消费者程序或kafka broker之间)认证使用,可以定义多个,后续配置可能还可以根据不同的用户定义不同的ACL权限。上例我定义了三个用户,一个是admin,一个是kafkacluster,一个是producer,一个是consumer,等号后面是对应用户的密码(如user_producer定义了用户名为producer,密码为producerpasswd的用户)。再选择一个用户,用于Kafka内部的各个broker之间通信,这里我选择kafkacluster用户,对应的密码是"kafkaclusterpasswd"。

  Client模块,主要是kafka broker链接到zookeeper,最为zookeeper的一个客户端。从上文的Zookeeper JAAS文件中选择一个用户kafka,密码为"zk_kafka_client_passwd"。

2.在Kafka server.properties添加、修改如下信息
  1. # Set listeners
  2. listeners=SASL_PLAINTEXT://10.194.202.17:8092
  3. ## Set protocol
  4. sasl.mechanism.inter.broker.protocol=PLAIN
  5. sasl.enabled.mechanisms=PLAIN
  6. ###security.inter.broker.protocol和inter.broker.listener.name 只能设置一个,否则报错
  7. security.inter.broker.protocol=SASL_PLAINTEXT
  8. # set acl
  9. ## default false | true to accept all the users to use it
  10. allow.everyone.if.no.acl.found=false
  11. authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
  12. ##添加超级用户,超级用户必须在kafka_server_jaas.conf文件 KafkaServer模块授权,如:user_admin="adminpasswd"
  13. super.users=User:admin;User:kafkacluster
3.Kafka启动脚本中加入配置,把第一步创建的文件 kafka_server_jaas.conf 加载到启动环境中

修改kafka的kafka-server-start.sh文件

  1. 修改 export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
  2. 为 export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G -Djava.security.auth.login.config=$KAFKA_HOME/config/kafka_server_jaas.conf"

或者 在启动kafka之前执行一下脚本来加载到启动环境中(本人是采用的这种做法)

export KAFKA_HEAP_OPTS=$KAFKA_HEAP_OPTS" -Djava.security.auth.login.config=$KAFKA_HOME/config/kafka_server_jaas.conf"
4.启动Kafka服务

kafka server.properties 完整配置

  1. # Set listeners
  2. listeners=SASL_PLAINTEXT://10.194.202.17:8092
  3. ## Set protocol
  4. sasl.mechanism.inter.broker.protocol=PLAIN
  5. sasl.enabled.mechanisms=PLAIN
  6. ###security.inter.broker.protocol和inter.broker.listener.name 只能设置一个,否则报错
  7. security.inter.broker.protocol=SASL_PLAINTEXT
  8. # set acl
  9. # default false | true to accept all the users to use it
  10. allow.everyone.if.no.acl.found=false
  11. authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
  12. #添加超级用户,超级用户必须在kafka_server_jaas.conf文件 KafkaServer模块授权,如:user_admin="adminpasswd"
  13. super.users=User:admin;User:kafkacluster
  14. log.retention.hours=168
  15. zookeeper.connect=10.194.202.17:8181,10.194.202.17:8182,10.194.202.17:8183/kafka
  16. compression.type=producer
  17. delete.topic.enable=true
  18. log.dirs=/home/hadoop/kafka/data
  19. log.dir=/home/hadoop/kafka/data
  20. num.partitions=1
  21. min.insync.replicas=1
  22. default.replication.factor=3
  23. auto.create.topics.enable=false
  24. log.retention.check.interval.ms=300000

启动:kafka-server-start.sh $KAFKA_HOME/config/server.properties

认证使用

      由于使用consumer和producer用户之前先需要ACL授权,ACL授权将在下面讲解,在这里先使用超级用户:admin,超级用户不受ACL权限控制。

一、客户端调用认证

有以下三种方式

1.命令行脚本客户端访问

配置 kafka_client_jaas.conf 文件

  1. KafkaClient{
  2. org.apache.kafka.common.security.plain.PlainLoginModule required
  3. username="admin"
  4. password="adminpasswd";
  5. };

在启动kafka 生产或者消费之前执行一下脚本来加载到启动环境中

export KAFKA_HEAP_OPTS=$KAFKA_HEAP_OPTS" -Djava.security.auth.login.config=$KAFKA_HOME/config/kafka_client_jaas.conf"

consumer、producer 启动脚本

  1. #consumer
  2. kafka-console-consumer.sh --bootstrap-server 10.194.202.17:8092 \
  3. --consumer-property security.protocol=SASL_PLAINTEXT \
  4. --consumer-property sasl.mechanism=PLAIN \
  5. --consumer-property client.id=consumer_01 \
  6. --topic test.1 \
  7. --group c1
  8. #producer
  9. kafka-console-producer.sh --broker-list 10.194.202.17:8092 \
  10. --consumer-property security.protocol=SASL_PLAINTEXT \
  11. --consumer-property sasl.mechanism=PLAIN \
  12. --consumer-property client.id=producer_01 \
  13. --topic test.1

配置sasl.conf文件

  1. security.protocol=SASL_PLAINTEXT
  2. sasl.mechanism=PLAIN
  3. sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="adminpasswd";

consumer、producer 启动脚本

  1. #consumer
  2. kafka-console-consumer.sh --bootstrap-server 10.194.202.17:8092 \
  3. --consumer.config $KAFKA_HOME/config/sasl.conf \
  4. --consumer-property client.id=consumer_01 \
  5. --topic test.1 \
  6. --group c1
  7. #producer
  8. kafka-console-producer.sh --broker-list 10.194.202.17:8092 \
  9. --consumer.config $KAFKA_HOME/config/sasl.conf \
  10. --consumer-property client.id=producer_01 \
  11. --topic test.1
2.拼接sasl认证(本人采用这种方式)

consumer、producer 启动脚本

  1. #consumer
  2. kafka-console-consumer.sh --bootstrap-server 10.194.202.17:8092 \
  3. --consumer-property "sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='adminpasswd';" \
  4. --consumer-property security.protocol=SASL_PLAINTEXT \
  5. --consumer-property sasl.mechanism=PLAIN \
  6. --consumer-property client.id=consumer_01 \
  7. --topic test.1 \
  8. --group c1
  9. #producer
  10. kafka-console-producer.sh --broker-list 10.194.202.17:8092 \
  11. --consumer-property "sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='adminpasswd';" \
  12. --consumer-property security.protocol=SASL_PLAINTEXT \
  13. --consumer-property sasl.mechanism=PLAIN \
  14. --consumer-property client.id=producer_01 \
  15. --topic test.1
3.java客户端访问

有以下两种方式

①配置 kafka_client_jaas.conf 文件
  1. KafkaClient{
  2. org.apache.kafka.common.security.plain.PlainLoginModule required
  3. username="admin"
  4. password="adminpasswd";
  5. };

在producer 或者 consumer增加以下三项配置:

  1. props.put("security.protocol", "SASL_PLAINTEXT");
  2. props.put("sasl.mechanism", "PLAIN");
  3. System.setProperty("java.security.auth.login.config", "conf/kafka_client_jaas.conf");
②无配置文件模式(本人采用这种配置)

在producer 或者 consumer增加以下三项配置:

  1. props.put("security.protocol", "SASL_PLAINTEXT");
  2. props.put("sasl.mechanism", "PLAIN");
  3. props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"adminpasswd\";");

样例:

  1. package com.kafka;
  2. import org.apache.kafka.clients.producer.KafkaProducer;
  3. import org.apache.kafka.clients.producer.ProducerRecord;
  4. import java.util.Properties;
  5. import java.util.concurrent.ExecutionException;
  6. /**
  7. * Created by lijianjun 2020/4/15
  8. */
  9. public class Kafka_Producer {
  10. public static void main(String[] args) {
  11. Properties props = new Properties();
  12. props.put("bootstrap.servers", "10.194.202.17:8092");
  13. props.put("client.id", "producer_01");
  14. props.put("acks", "all");
  15. props.put("retries", 0);
  16. props.put("batch.size", 16384);
  17. props.put("linger.ms", 1);
  18. props.put("buffer.memory", 33554432);
  19. props.put("compression.type", "gzip");//none, gzip, snappy, 或者 lz4. 默认none
  20. props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  21. props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  22. // Java客户端调用认证
  23. props.put("security.protocol", "SASL_PLAINTEXT");
  24. props.put("sasl.mechanism", "PLAIN");
  25. props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"producer\" password=\"producerpasswd\";");
  26. //System.setProperty("java.security.auth.login.config", "C:/Kafkaanquan/kafka_client_jaas.conf"); 读取配置文件方式,与读取配置文件二选一
  27. KafkaProducer<String, String> producer = new KafkaProducer<>(props);
  28. try {
  29. for (int i = 0; i < 10; i++) {
  30. producer.send(new ProducerRecord<String, String>("test.1", Integer.toString(i), Integer.toString(i))).get();
  31. }
  32. } catch (InterruptedException e) {
  33. e.printStackTrace();
  34. } catch (ExecutionException e) {
  35. e.printStackTrace();
  36. }
  37. producer.close();
  38. }
  39. }

二、Kafka 用户权限控制 ACL

        在以上都配置好的基础上,在开始之前,我们简单学习下Kafka ACL的格式。根据官网的介绍,Kafka中一条ACL的格式如下:“Principal P is [Allowed/Denied] Operation O From Host H On Resource R”。它们的含义描述如下:

  • principal:表示一个Kafka user
  • operation:表示一个具体的操作类型,有效值: Read(读), Write(写), Create(创建), Delete(删除), Alter(修改), Describe(描述), ClusterAction(集群操作), All(所有)
  • Host:表示连向Kafka集群的client的IP地址,如果是'*’则表示所有IP。注意:当前Kafka不支持主机名,只能指定IP地址
  • Resource:表示一种Kafka资源类型。当前共有5种类型:TOPIC、CLUSTER、GROUP、transactional-id、delegation-token
1.查看 acls

查看所有权限:kafka-acls.sh --authorizer-properties zookeeper.connect=10.194.202.17:8181/kafka --list 

指定一种资源类型查看:kafka-acls.sh --authorizer-properties zookeeper.connect=10.194.202.17:8181/kafka --list --topic test.1

2.添加Acls

注意:

  • 添加acl时,必须指定:–allow-principal,–deny-principal中的一个。
  • 如果不指定IP地址(即–allow-host、–deny-host),默认是所有IP地址

①允许producer用户从ip地址(10.194.202.17)对topic(test.1)执行Read和Write操作

  1. kafka-acls.sh --authorizer-properties zookeeper.connect=10.194.202.17:8181/kafka \
  2. --add \
  3. --allow-principal User:producer \
  4. --allow-host 10.194.202.17 \
  5. --operation Read \
  6. --operation Write \
  7. --topic test.1

②指定资源匹配模式

默认是匹配整个文本(默认:LITERAL) <ANY|MATCH|LITERAL|PREFIXED>

  1. kafka-acls.sh --authorizer-properties zookeeper.connect=10.194.202.17:8181/kafka \
  2. --add \
  3. --allow-principal User:producer \
  4. --producer --topic test.* \
  5. --resource-pattern-type PREFIXED

③添加producer权限

         producer 是一种生产端的方便式写法,相当于 --operation WRITE --operation CREATE,包括的权限有:WRITE、CREATE

  • 允许producer从所有IP地址对topic(test.1)执行生产数据操作
  1. kafka-acls.sh --authorizer-properties zookeeper.connect=10.194.202.17:8181/kafka \
  2. --add \
  3. --allow-principal User:producer \
  4. --producer \
  5. --topic test.1
  • 允许producer从ip地址(172.30.23.154)对topic(test.1)执行生产数据操作

  1. kafka-acls.sh --authorizer-properties zookeeper.connect=10.194.202.17:8181/kafka \
  2. --add \
  3. --allow-principal User:producer \
  4. --allow-host 172.30.23.154 \
  5. --producer \
  6. --topic test.1
  • 拒绝consumer用户对topic(test.1) 执行写权限
  1. kafka-acls.sh --authorizer-properties zookeeper.connect=10.194.202.17:8181/kafka \
  2. --add \
  3. --deny-principal User:consumer \
  4. --operation Write \
  5. --topic test.1

④添加consumer权限

        consumer 是一种消费端的方便式写法,相当于 --operation READ  ,包括的READ

  • 允许consumer用户从所有IP地址对topic(test.1)执行消费数据操作
  1. kafka-acls.sh --authorizer-properties zookeeper.connect=10.194.202.17:8181/kafka \
  2. --add \
  3. --allow-principal User:consumer \
  4. --consumer \
  5. --topic test.1 \
  6. --group c1
  • 通过 --operation 指定消费权限最为灵活

注意: --topic与–group,为两个不同的资源,如果资源匹配模式不同,则需要分开指定

1.consumer用户匹配消费以test.为前缀的topic

  1. kafka-acls.sh --authorizer-properties \
  2. zookeeper.connect=10.194.202.17:8181/kafka \
  3. --add \
  4. --allow-principal User:consumer \
  5. --operation Read --operation Describe \
  6. --topic test. \
  7. --resource-pattern-type PREFIXED

2.consumer用户只能通过groupid(c1)消费数据

  1. kafka-acls.sh --authorizer-properties \
  2. zookeeper.connect=10.194.202.17:8181/kafka \
  3. --add --allow-principal User:consumer \
  4. --operation Read \
  5. --operation Describe \
  6. --group c1 \
  7. --resource-pattern-type LITERAL
3.移除 Acls
删除不同的资源及资源类型
  • 删除group资源

删除–group(消费组)资源,资源类型为LITERAL,c1的所有权限

  1. kafka-acls.sh --authorizer-properties \
  2. zookeeper.connect=10.194.202.17:8181/kafka \
  3. --remove --group c1 \
  4. --resource-pattern-type LITERAL
  5. Are you sure you want to delete all ACLs for resource filter
  6. `ResourcePattern(resourceType=GROUP, name=c1, patternType=LITERAL)`? (y/n)
  • 删除topic资源

删除–topic(主题)资源,资源类型为LITERAL,topic(test.1)下面的所有权限

  1. kafka-acls.sh --authorizer-properties \
  2. zookeeper.connect=10.194.202.17:8181/kafka \
  3. --remove --topic test.1 \
  4. --resource-pattern-type LITERAL
  5. Are you sure you want to delete all ACLs for resource filter
  6. `ResourcePattern(resourceType=TOPIC, name=test.1, patternType=LITERAL)`? (y/n)
  • 不同的资源类型

删除–topic(主题)资源,资源类型为PREFIXED,以(test.)为前缀的topic下面的所有权限

  1. kafka-acls.sh --authorizer-properties \
  2. zookeeper.connect=10.194.202.17:8181/kafka \
  3. --remove --topic test. \
  4. --resource-pattern-type PREFIXED
  5. Are you sure you want to delete all ACLs for resource filter
  6. `ResourcePattern(resourceType=TOPIC, name=test., patternType=PREFIXED)`? (y/n)
删除级别控制

通过是否制定用户(–allow-principal)来控制移除ACL的权限粒度

  • 细粒度的删除

注意:

  • 如果不指定IP地址(即–allow-host、–deny-host),默认是所有IP地址
  • 如果不指定操作类型(即–operation),默认是ALL

不指定IP地址和操作类型

  1. kafka-acls.sh --authorizer-properties \
  2. zookeeper.connect=10.194.202.17:8181/kafka \
  3. --remove \
  4. --topic test.1 \
  5. --allow-principal User:user01 \
  6. --resource-pattern-type LITERAL
  7. Are you sure you want to remove ACLs:
  8. (principal=User:user01, host=*, operation=ALL, permissionType=ALLOW)
  9. from resource filter `ResourcePattern(resourceType=TOPIC, name=test.1, patternType=LITERAL)`? (y/n)

指定IP地址和操作类型

  1. kafka-acls.sh --authorizer-properties \
  2. zookeeper.connect=10.194.202.17:8181/kafka \
  3. --remove \
  4. --topic test.1 \
  5. --allow-principal User:user01 \
  6. --allow-host 10.194.202.17 \
  7. --operation Create \
  8. --resource-pattern-type LITERAL
  9. Are you sure you want to remove ACLs:
  10. (principal=User:user01, host=10.194.202.17, operation=CREATE, permissionType=ALLOW)
  11. from resource filter `ResourcePattern(resourceType=TOPIC, name=test.1, patternType=LITERAL)`? (y/n)
  • 粗粒度的删除

注意:

  • 如果不指定用户(即–allow-principal、–deny-principal),则指定 IP地址和操作类型无效。
    删除只能通过资源类型、名称和资源匹配类型三个条件来删除ACLS权限,
    会删除此资源类型、名称、和资源匹配类型下的所有的权限,所以请谨慎操作。

不指定用户,指定IP地址和操作类型

  1. kafka-acls.sh --authorizer-properties \
  2. zookeeper.connect=10.194.202.17:8181/kafka \
  3. --remove \
  4. --topic test.1 \
  5. --allow-host 10.194.202.17 \
  6. --operation Create \
  7. --resource-pattern-type LITERAL
  8. Are you sure you want to delete all ACLs for resource filter
  9. `ResourcePattern(resourceType=TOPIC, name=test.1, patternType=LITERAL)`? (y/n)

不指定用户,不指定IP地址和操作类型

  1. kafka-acls.sh --authorizer-properties \
  2. zookeeper.connect=10.194.202.17:8181/kafka \
  3. --remove \
  4. --topic test.1 \
  5. --resource-pattern-type LITERAL
  6. Are you sure you want to delete all ACLs for resource filter
  7. `ResourcePattern(resourceType=TOPIC, name=test.1, patternType=LITERAL)`? (y/n)

问题整理

一、配置SASL_PLAINTEXT认证和ACLS授权常见问题

kafka启动报错

  • 问题1

[Controller id=11, targetBrokerId=11] Failed authentication with nj03-ns-passport-201611-c2c-m12-09.nj03.baidu.com/10.194.202.17 (Authentication failed: Invalid username or password) (org.apache.kafka.common.network.Selector)

原因: SASL_PLAINTEXT认证时用户密码验证不通过,即 kafka_server_jaas.conf 文件中KafkaServer 模块 Broker与Broker之间sasl用户密码验证不通过,如下:

username="kafkacluster1"
password="kafkaclusterpasswd"
user_kafkacluster="kafkaclusterpasswd"

  • 问题2

ERROR [KafkaApi-11] Error when handling request: clientId=11, correlationId=0, api=UPDATE_METADATA, version=6, body={controller_id=11,controller_epoch=1,broker_epoch=4294970758,topic_states=[],live_brokers=[{id=11,endpoints=[{port=8092,host=10.194.202.17,listener=SASL_PLAINTEXT,security_protocol=2,_tagged_fields={}}],rack=null,_tagged_fields={}}],_tagged_fields={}} (kafka.server.KafkaApis)
org.apache.kafka.common.errors.ClusterAuthorizationException: Request Request(processor=0, connectionId=10.194.202.17:8092-10.194.202.17:59100-0, session=Session(User:kafkacluster,/10.194.202.17), listenerName=ListenerName(SASL_PLAINTEXT), securityProtocol=SASL_PLAINTEXT, buffer=null) is not authorized

原因: kafka_server_jaas.conf 文件中KafkaServer 模块 Broker与Broker之间sasl用户密码验证正确,但acls 授权没通过。
需要添加超级用户,超级用户不需要acls授权,在server.properties文件中添加kafkacluster用户,如下:

super.users=User:admin;User:kafkacluster

kafka 客户端(包括consumer和producer)连接报错

  • 问题1

[2020-04-17 16:17:20,986] WARN [Consumer clientId=consumer_01, groupId=c1] Bootstrap broker 10.194.202.17:8092 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient)

[2020-04-17 16:17:20,989] ERROR Error processing message, terminating consumer process: (kafka.tools.ConsoleConsumer$)
org.apache.kafka.common.errors.SaslAuthenticationException: Authentication failed: Invalid username or password

原因:SASL_PLAINTEXT认证时用户密码验证不通过,即 kafka_server_jaas.conf 文件中KafkaServer 模块 client与Broker之间sasl用户密码验证不通过。
检查如下用户名密码:

sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='consumer' password='consumerpasswd';

  • 问题2

[2020-04-17 16:17:01,166] WARN [Consumer clientId=consumer_01, groupId=c1] Error while fetching metadata with correlation id 2 : {test.1=TOPIC_AUTHORIZATION_FAILED} (org.apache.kafka.clients.NetworkClient)

[2020-04-17 16:17:01,172] ERROR [Consumer clientId=consumer_01, groupId=c1] Topic authorization failed for topics [test.1] (org.apache.kafka.clients.Metadata)

[2020-04-17 16:17:01,174] ERROR Error processing message, terminating consumer process: (kafka.tools.ConsoleConsumer$)
org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [test.1]

原因:SASL_PLAINTEXT认证时用户密码验证正确,但acls 授权没通过,即此用户没有访问此topic(test.1)的权限,请联系管理员对用户授权

参考:
Apache Kafka
Kafka 中文文档 - ApacheCN

    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多