分享

Kafka ACL实现架构以及实操案例剖析

 昵称10087950 2023-02-20 发布于江苏

大家好,我是威哥,《RocketMQ技术内幕》、《RocketMQ实战》作者、RocketMQ社区首席布道师、极客时间《中间件核心技术与实战》专栏作者、中通快递基础架构资深架构师,越努力越幸运,唯有坚持不懈,与大家共勉。

本文来自中通快递高级中间件开发工程师李大伟的投稿,文章首发:https://blog.csdn.net/u011618288/article/details/129105777,强烈大家关注他的博客。

1、方案设计

基本原则:不接入外部存储,基于kafka原生ACL认证

环境:kafka-2.2.1、zookeeper-3.6.3

kafka给我们提供了SASL/SCRAM模式,将SASL、ACL规则信息存储到zookeeper中,并且通过KafkaClientAdmin Api可新增、编辑、删除ACL规则,其特性如下

应用发送、消费实现动态身份认证和授权 基于kafka SASL/SCRAM模式,客户端会在建立连接进行SASL身份验证,在消费发送时,进行ACL鉴权

安全认证代码无侵入性、兼容原生kafka api 有两种配置方式,通过根据-Djava.security.auth.login.config指定jaas配置文件,并且配置producer.properties/consumer.properties 开启SASL配置,兼容原生kafka api

账号级别区分:管理账号有最高权限、业务账号限定资源权限 可通过配置kafka集群超级管理员,通过超级管理员账号登录是不进行ACL鉴权。因此超级管理员可以操作所有的资源权限

安全认证支持动态开启、关闭 kafka提供SASL和ACL管理的api,通过这些api可以新增、修改、查询、删除SASL和ACL策略。通过配置策略,可以实现安全策略开启、关闭

1.1 客户端与kafka集群认证、授权

客户端与Kafka集群认证、授权总体工作机制如下图所示:

Image

安全认证、授权步骤如下:

在集群初始化时,配置开启SASL、ACL,包括

broker与broker的SASL认证配置

broker与zookeeper的SASL认证配置

添加SASL、ACL规则 管理员账号:添加SASL管理员账号,管理员账号新增完成后,broker与broker的认证才会成功。应用账号:新增主题、消费组时,通过KafkaAdminClient Api新增以alice为用户名,应用scret-1为密码的SASL认证信息。并且给这个用户授予消息写入权限

客户端配置SASL配置,并发送消息 在properties中配置SASL账号密码。或者指定jaas配置文件

服务端认证、鉴权 在客户端与服务端建立连接时,对客户端alice进行身份认证(SASL),认证通过后,在发送消息时检查资源(主题)是否授权(ACL)给改用户alice

1.2 zookeeper与broker认证、鉴权

我们这一篇文章中zookeeper与broker认证,鉴权主要采取的方式为SASL/SCRAM + ACL,其核心设计图如下所示:

Image

2、kafka SASL+ACL实战

2.1 集群配置

2.1.1 zookeeper SASL配置

在zookeeper的配置文件中zoo.cfg中增加如下配置项:

sessionRequireClientSASLAuth=true

jaasLoginRenew=3600000

requireClientAuthScheme=sasl

zookeeper.sasl.client=true

authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider

并且新增JAAS配置文件:zoo_jaas.conf,用于定义认证身份信息,其配置内容如下所示:Server {

org.apache.zookeeper.server.auth.DigestLoginModule required

username='zookeeper'

password='zookeepersecret”

user_kafka='kafkasecret';

};

然后需要指定jaas配置文件:

export SERVER_JVMFLAGS='-Djava.security.auth.login.config=/Users/vhicool/zookeeper/conf/zoo_jaas.conf'

通过如下命令启动zookeeper服务端./bin/zkServer.sh start-foreground

2.1.2 Kafka SASL配置

首先我们新增broker账号,需要在broker重庆应用sasl+acl配置之前执行。

bin/kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=adminsecret]' --entity-type users --entity-name admin

然后再新增管理账号,管理员账号用于运维人员维护集群时使用,其具体命令如下:bin/kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=adminsecret]' --entity-type users --entity-name kafkaadmin

然后再服务端创建jaas.conf,具体的内容如下所示:

//broker与broker

KafkaServer {

org.apache.kafka.common.security.plain.PlainLoginModule required

username='admin'

password='adminsecret';

};

//broker与zookeeper

Client{

org.apache.kafka.common.security.plain.PlainLoginModule required

username='kafka'

password='kafkasecret';

};

然后通过-D参数指定kafka环境变量,使得SASL生效:-Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf

最后在kafka的server.properties配置文件中配置如下内容:

#SASL/SCRAM

listeners=SASL_PLAINTEXT://host.name:port

security.inter.broker.protocol=SASL_PLAINTEXT

sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256

sasl.enabled.mechanisms=SCRAM-SHA-25

#ACL

authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer

super.users=User:admin

2.2 新增认证、授权规则

除了使用kafka自带cli,同时也可以使用kafka通过的api去维护:KafkaClientAdmin,如果使用该类进行操作,需要新增kafka client JAAS认证文件:kafka_client_jaas.conf,具体的内容如下所示://kafka client与zookeeper SASL

Client{

org.apache.kafka.common.security.plain.PlainLoginModule required

username='kafka'

password='kafkasecret';

};

// kafka client与broker SASL

KafkaClient {

org.apache.kafka.common.security.plain.PlainLoginModule required

username='kafkaadmin'

password='adminsecret';

};

然后需要通过环境变量设置登录验证文件,具体命令如下:

export KAFKA_OPTS='-Djava.security.auth.login.config=/etc/kafka/kafka_client_jaas.conf'

接下来我们尝试使用命令行新增主题。

使用如下命令创建一个主题: bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic topic-1

给用户Alice saal设置登录信息:

bin/kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=alice-secret]' --entity-type users --entity-name alice

并对Alice用户对主题topic-1设置发送权限,代码如下所示:bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:alice --producer --topic topic-1

经过上面的操作, 我们可以用下面的命令尝试对topic-1主题发送消息:

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topic-1 --producer-property security.protocol=SASL_PLAINTEXT --producer-property sasl.mechanism=SCRAM-SHA-256

2.3 客户端配置

在用户alice配置SASL认证,并且授权主题topic-1发送权限后,客户端通过原生api发送消息,示例代码如下所示:  Properties kafkaProperties = new Properties();

kafkaProperties.put(org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,

'org.apache.kafka.common.serialization.StringSerializer');

kafkaProperties.put(org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,

'org.apache.kafka.common.serialization.ByteArraySerializer');

kafkaProperties.put('bootstrap.servers', 'localhost:9092');

//SASL配置

kafkaProperties.put('sasl.jaas.config','org.apache.kafka.common.security.scram.ScramLoginModule required username=\'alice\' password=\'alice-secret\';');

kafkaProperties.put('security.protocol','SASL_PLAINTEXT');

kafkaProperties.put('sasl.mechanism','SCRAM-SHA-256');

KafkaProducer producer = new KafkaProducer<>(kafkaProperties);

ProducerRecord record = new ProducerRecord('topic-1', null, null, 'test message', null);

producer.send(record).get();

3、自定义SASL、ACL存储形式

3.1 SASL存储形式

SASL的存储形式主义分为本地存储,zookeeper存储和外部认证服务器存储,一一介绍如下:

本地文件存储 PLAIN模式,SASL/PLAIN是一种简单的用户名/密码认证机制,通常与TLS一起用于加密以实现安全认证。他的SASL认证是存储在本地JAAS文件,因此如果需要实现动态身份认证,需要拓展SASL认证接口

zookeeper存储

SASL/SCRAM 它解决了与执行用户名/密码认证的传统机制(如PLAIN和DIGEST-MD5)的安全问题。RFC 5802中定义了该机制。Kafka支持SCRAM-SHA-256和SCRAM-SHA-512,它们可以与TLS一起使用以执行安全认证。用户名用作配置ACL等的认证主体。Kafka中的默认SCRAM实现在Zookeeper中存储SCRAM凭据,适用于Zookeepher位于专用网络上的Kafka安装

外部认证服务器存储 SASL/Kerberos

kafka默认SASL是通过静态JAAS文件配置的,从Kafka 2.0版开始,可以通过使用配置选项sasl.server.callback.handler.class和sasl.client.callback.handler.class配置自己的回调处理程序来从外部源获取用户名和密码,从而避免在磁盘上存储明文密码。

sasl.server.callback.handler.class 实现 AuthenticateCallbackHandler 接口的 SASL 服务器回调处理程序类的完全限定名称。服务器回调处理程序必须以侦听器前缀和小写的 SASL 机制名称为前缀。例如,listener.name.sasl_ssl.plain.sasl.server.callback.handler.class=com.example.CustomPlainCallbackHandler

sasl.client.callback.handler.class 实现 AuthenticateCallbackHandler 接口的 SASL 客户端回调处理程序类的完全限定名称。

3.2 ACL存储形式

kafka默认ACL存储在zookeeper,其时序图如下所示:

Image

我们可以通过客户端配置authorizer.class.name,自定义实现存储位置。

kafka默认是通过zookeeper存储ACL规则,实现类为:kafka.security.auth.SimpleAclAuthorizer。如果我们要扩展ACL存储,需要自定义认证类并实现kafka.security.auth.Authorizer接口,包括实现authorize、addAcls、removeAcls、getAcls等方法,并在broker配置文件中指定自定义的authorizer即可。

authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer

    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多