大家好,我是威哥,《RocketMQ技术内幕》、《RocketMQ实战》作者、RocketMQ社区首席布道师、极客时间《中间件核心技术与实战》专栏作者、中通快递基础架构资深架构师,越努力越幸运,唯有坚持不懈,与大家共勉。 本文来自中通快递高级中间件开发工程师李大伟的投稿,文章首发:https://blog.csdn.net/u011618288/article/details/129105777,强烈大家关注他的博客。 1、方案设计 基本原则:不接入外部存储,基于kafka原生ACL认证 环境:kafka-2.2.1、zookeeper-3.6.3 kafka给我们提供了SASL/SCRAM模式,将SASL、ACL规则信息存储到zookeeper中,并且通过KafkaClientAdmin Api可新增、编辑、删除ACL规则,其特性如下 应用发送、消费实现动态身份认证和授权 基于kafka SASL/SCRAM模式,客户端会在建立连接进行SASL身份验证,在消费发送时,进行ACL鉴权 安全认证代码无侵入性、兼容原生kafka api 有两种配置方式,通过根据-Djava.security.auth.login.config指定jaas配置文件,并且配置producer.properties/consumer.properties 开启SASL配置,兼容原生kafka api 账号级别区分:管理账号有最高权限、业务账号限定资源权限 可通过配置kafka集群超级管理员,通过超级管理员账号登录是不进行ACL鉴权。因此超级管理员可以操作所有的资源权限 安全认证支持动态开启、关闭 kafka提供SASL和ACL管理的api,通过这些api可以新增、修改、查询、删除SASL和ACL策略。通过配置策略,可以实现安全策略开启、关闭 1.1 客户端与kafka集群认证、授权 客户端与Kafka集群认证、授权总体工作机制如下图所示: 安全认证、授权步骤如下: 在集群初始化时,配置开启SASL、ACL,包括 broker与broker的SASL认证配置 broker与zookeeper的SASL认证配置 添加SASL、ACL规则 管理员账号:添加SASL管理员账号,管理员账号新增完成后,broker与broker的认证才会成功。应用账号:新增主题、消费组时,通过KafkaAdminClient Api新增以alice为用户名,应用scret-1为密码的SASL认证信息。并且给这个用户授予消息写入权限 客户端配置SASL配置,并发送消息 在properties中配置SASL账号密码。或者指定jaas配置文件 服务端认证、鉴权 在客户端与服务端建立连接时,对客户端alice进行身份认证(SASL),认证通过后,在发送消息时检查资源(主题)是否授权(ACL)给改用户alice 1.2 zookeeper与broker认证、鉴权 我们这一篇文章中zookeeper与broker认证,鉴权主要采取的方式为SASL/SCRAM + ACL,其核心设计图如下所示: 2、kafka SASL+ACL实战 2.1 集群配置 2.1.1 zookeeper SASL配置 在zookeeper的配置文件中zoo.cfg中增加如下配置项: sessionRequireClientSASLAuth=true jaasLoginRenew=3600000 requireClientAuthScheme=sasl zookeeper.sasl.client=true authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider 并且新增JAAS配置文件:zoo_jaas.conf,用于定义认证身份信息,其配置内容如下所示:Server { org.apache.zookeeper.server.auth.DigestLoginModule required username='zookeeper' password='zookeepersecret” user_kafka='kafkasecret'; }; 然后需要指定jaas配置文件: export SERVER_JVMFLAGS='-Djava.security.auth.login.config=/Users/vhicool/zookeeper/conf/zoo_jaas.conf' 通过如下命令启动zookeeper服务端./bin/zkServer.sh start-foreground 2.1.2 Kafka SASL配置 首先我们新增broker账号,需要在broker重庆应用sasl+acl配置之前执行。 bin/kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=adminsecret]' --entity-type users --entity-name admin 然后再新增管理账号,管理员账号用于运维人员维护集群时使用,其具体命令如下:bin/kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=adminsecret]' --entity-type users --entity-name kafkaadmin 然后再服务端创建jaas.conf,具体的内容如下所示: //broker与broker KafkaServer { org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='adminsecret'; }; //broker与zookeeper Client{ org.apache.kafka.common.security.plain.PlainLoginModule required username='kafka' password='kafkasecret'; }; 然后通过-D参数指定kafka环境变量,使得SASL生效:-Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf 最后在kafka的server.properties配置文件中配置如下内容: #SASL/SCRAM listeners=SASL_PLAINTEXT://host.name:port security.inter.broker.protocol=SASL_PLAINTEXT sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256 sasl.enabled.mechanisms=SCRAM-SHA-25 #ACL authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer super.users=User:admin 2.2 新增认证、授权规则 除了使用kafka自带cli,同时也可以使用kafka通过的api去维护:KafkaClientAdmin,如果使用该类进行操作,需要新增kafka client JAAS认证文件:kafka_client_jaas.conf,具体的内容如下所示://kafka client与zookeeper SASL Client{ org.apache.kafka.common.security.plain.PlainLoginModule required username='kafka' password='kafkasecret'; }; // kafka client与broker SASL KafkaClient { org.apache.kafka.common.security.plain.PlainLoginModule required username='kafkaadmin' password='adminsecret'; }; 然后需要通过环境变量设置登录验证文件,具体命令如下: export KAFKA_OPTS='-Djava.security.auth.login.config=/etc/kafka/kafka_client_jaas.conf' 接下来我们尝试使用命令行新增主题。 使用如下命令创建一个主题: bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic topic-1 给用户Alice saal设置登录信息: bin/kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=alice-secret]' --entity-type users --entity-name alice 并对Alice用户对主题topic-1设置发送权限,代码如下所示:bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:alice --producer --topic topic-1 经过上面的操作, 我们可以用下面的命令尝试对topic-1主题发送消息: bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topic-1 --producer-property security.protocol=SASL_PLAINTEXT --producer-property sasl.mechanism=SCRAM-SHA-256 2.3 客户端配置 在用户alice配置SASL认证,并且授权主题topic-1发送权限后,客户端通过原生api发送消息,示例代码如下所示: Properties kafkaProperties = new Properties(); kafkaProperties.put(org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 'org.apache.kafka.common.serialization.StringSerializer'); kafkaProperties.put(org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 'org.apache.kafka.common.serialization.ByteArraySerializer'); kafkaProperties.put('bootstrap.servers', 'localhost:9092'); //SASL配置 kafkaProperties.put('sasl.jaas.config','org.apache.kafka.common.security.scram.ScramLoginModule required username=\'alice\' password=\'alice-secret\';'); kafkaProperties.put('security.protocol','SASL_PLAINTEXT'); kafkaProperties.put('sasl.mechanism','SCRAM-SHA-256'); KafkaProducer producer = new KafkaProducer<>(kafkaProperties); ProducerRecord record = new ProducerRecord('topic-1', null, null, 'test message', null); producer.send(record).get(); 3、自定义SASL、ACL存储形式 3.1 SASL存储形式 SASL的存储形式主义分为本地存储,zookeeper存储和外部认证服务器存储,一一介绍如下: 本地文件存储 PLAIN模式,SASL/PLAIN是一种简单的用户名/密码认证机制,通常与TLS一起用于加密以实现安全认证。他的SASL认证是存储在本地JAAS文件,因此如果需要实现动态身份认证,需要拓展SASL认证接口 zookeeper存储 SASL/SCRAM 它解决了与执行用户名/密码认证的传统机制(如PLAIN和DIGEST-MD5)的安全问题。RFC 5802中定义了该机制。Kafka支持SCRAM-SHA-256和SCRAM-SHA-512,它们可以与TLS一起使用以执行安全认证。用户名用作配置ACL等的认证主体。Kafka中的默认SCRAM实现在Zookeeper中存储SCRAM凭据,适用于Zookeepher位于专用网络上的Kafka安装 外部认证服务器存储 SASL/Kerberos kafka默认SASL是通过静态JAAS文件配置的,从Kafka 2.0版开始,可以通过使用配置选项sasl.server.callback.handler.class和sasl.client.callback.handler.class配置自己的回调处理程序来从外部源获取用户名和密码,从而避免在磁盘上存储明文密码。 sasl.server.callback.handler.class 实现 AuthenticateCallbackHandler 接口的 SASL 服务器回调处理程序类的完全限定名称。服务器回调处理程序必须以侦听器前缀和小写的 SASL 机制名称为前缀。例如,listener.name.sasl_ssl.plain.sasl.server.callback.handler.class=com.example.CustomPlainCallbackHandler sasl.client.callback.handler.class 实现 AuthenticateCallbackHandler 接口的 SASL 客户端回调处理程序类的完全限定名称。 3.2 ACL存储形式 kafka默认ACL存储在zookeeper,其时序图如下所示: 我们可以通过客户端配置authorizer.class.name,自定义实现存储位置。 kafka默认是通过zookeeper存储ACL规则,实现类为:kafka.security.auth.SimpleAclAuthorizer。如果我们要扩展ACL存储,需要自定义认证类并实现kafka.security.auth.Authorizer接口,包括实现authorize、addAcls、removeAcls、getAcls等方法,并在broker配置文件中指定自定义的authorizer即可。 authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer |
|
来自: 昵称10087950 > 《JAVA》