分享

数据库|Flask+Redis实现登录权限管理

 算法与编程之美 2020-09-18

1 准备工作

Redis简单来说就是远程字典服务,通常也被称为数据结构服务器,因为他的值(value)可以是多种形式的。在开始之前,需要先安装Redis,这里先不过多赘述,需要注意的是安装完成后需要设置密码,具体方法可以在百度上搜索,很简单。首先在Flask配置文件添加Redis的配置信息。

app.config['REDIS_HOST'] = 'localhost'   #Redis的ip地址,本机的就是localhost

app.config['REDIS_PORT'] = 6379   #Redis端口,默认为6379

app.config['REDIS_DB'] = '0'

app.config['REDIS_PWD'] = 'yourpassword'  #Redis的密码

2 Redis数据库操作

utils文件目录下新建redis_utils.py文件,作用是对redis数据库进行操作。代码如下

import pickle

import redis

from flask import current_app as app

class Redis(object):

     """

     redis数据库操作

     """

     @staticmethod

     def _get_r():

         host = app.config['REDIS_HOST']

         port = app.config['REDIS_PORT']

         db = app.config['REDIS_DB']

         passwd = app.config['REDIS_PWD']

         r = redis.StrictRedis(host=host, port=port, db=db, password=passwd)

         return r

     @classmethod

     def write(self, key, value, expire=None):

         """

         写入键值对

         """

         # 判断是否有过期时间,没有就设置默认值

         if expire:

            expire_in_seconds = expire

         else:

            expire_in_seconds =  app.config['EXPIRES_IN']

         r = self._get_r()

         r.set(key, value, ex=expire_in_seconds)

     @classmethod

     def write_dict(self, key, value, expire=None):

         '''

         将内存数据二进制通过序列号转为文本流,再存入redis

         '''

         if expire:

            expire_in_seconds = expire

         else:

             expire_in_seconds =  app.config['REDIS_EXPIRE']

         r = self._get_r()

         r.set(pickle.dumps(key), pickle.dumps(value), ex=expire_in_seconds)

     @classmethod

     def read_dict(self, key):

         '''

         将文本流从redis中读取并反序列化,返回

         '''

         r = self._get_r()

         data = r.get(pickle.dumps(key))

         if data is None:

            return None

         return pickle.loads(data)

     @classmethod

     def read(self, key):

         """

         读取键值对内容

         """

         r = self._get_r()

         value = r.get(key)

         return value.decode('utf-8') if value else value

     @classmethod

     def hset(self, name, key, value):

         """

         写入hash

         """

         r = self._get_r()

         r.hset(name, key, value)

     @classmethod

     def hmset(self, key, *value):

         """

         读取指定hash表的所有给定字段的值

         """

         r = self._get_r()

         value = r.hmset(key, *value)

         return value

     @classmethod

     def hget(self, name, key):

         """

         读取指定hash表的键值

         """

         r = self._get_r()

         value = r.hget(name, key)

         return value.decode('utf-8') if value else value

     @classmethod

     def hgetall(self, name):

         """

         获取指定hash表所有的值

         """

         r = self._get_r()

         return r.hgetall(name)

     @classmethod

     def delete(self, *names):

         """

         删除一个或者多个

         """

         r = self._get_r()

         r.delete(*names)

     @classmethod

     def hdel(self, name, key):

         """

         删除指定hash表的键值

         """

         r = self._get_r()

         r.hdel(name, key)

     @classmethod

     def expire(self, name, expire=None):

         """

         设置过期时间

         """

         if expire:

            expire_in_seconds = expire

         else:

            expire_in_seconds =  app.config['REDIS_EXPIRE']

         r = self._get_r()

         r.expire(name, expire_in_seconds)

3 实现用户登录验证

首先注册一个user的蓝图

app.register_blueprint(user_blueprint,  url_prefix="/user/")

user = Blueprint("user",  __name__)

permisson目录下新建一个user.py。这里是写跟登录有关的接口的。首先是登录验证,大概思路是先接收用户名与密码,然后校验参数,两者都不能为空,接着用用户名去user数据库查找是否存在此用户,如果查找结果为空,则返回一个错误码。接着校验接收到的密码与数据库的密码是否匹配。(在存入密码的时候不能直接存明文,需要加密,此处用到了werkzeug.security这个库进行加密。)校验密码还是用这个库的check_password_hash,只需传入需要验证的两个密码。再用户名与密码都正确的情况下,生成一个token,以后客户端只需要带上这个token来请求即可。这时token需要存入Redis中,客户端传来的token就和Redis中的token作对比。当生成token后,这就登录成功了,然后返回token等用户信息。接下来是login的代码

@user.route('/login',  methods=["POST"])

def login():

     '''

     用户登录

     :return:token

     '''

     data = request.get_data()

     data = str(data, 'utf-8')

     res_dir = json.loads(data)

     print(res_dir)

     if res_dir is None:

         return NO_PARAMETER()

     # 获取前端传过来的参数

     username = res_dir.get("username")

     password = res_dir.get("password")

     # 校验参数

     if not all([username, password]):

         return jsonify(code=Code.NOT_NULL.value, msg="用户名和密码不能为空")

     try:

         user = User.query.filter_by(user_name=username).first()

     except Exception as e:

         print("login error{}".format(e))

         return jsonify(code=Code.REQUEST_ERROR.value, msg="获取信息失败")

     if user is None or not user.check_pwd(password) or user.del_flag == 2  or user.status == 2:

         return jsonify(code=Code.ERR_PWD.value, msg="用户名或密码错误")

     # 获取用户信息,传入生成token的方法,并接收返回的token

     # 获取用户角色

     user_role = Role.query.join(UserRole, Role.id ==  UserRole.role_id).join(User,

                                                                               UserRole.user_id == user.id).filter(

         User.id == user.id).all()

    role_list = [i.role_key for i in user_role]

     token = create_token(user.id, user.user_name, role_list)

     data = {'token': token, 'userId': user.id, 'userName': user.user_name,  'nickname': user.nickname}

     # 记录登录iptoken存入rerdis

     try:

         user.login_ip = request.remote_addr

         print(user)

         user.update()

         Redis.write(f"token_{user.user_name}", token)

     except Exception as e:

         print(e)

         return jsonify(code=Code.UPDATE_DB_ERROR.value, msg="登录失败:"+str(e))

     if token:

         # token返回给前端

         return jsonify(code=Code.SUCCESS.value, msg="登录成功", data=data)

     else:

         return jsonify(code=Code.REQUEST_ERROR.value, msg="请求失败", data=token)

生成token的方法:

def create_token(user_id, user_name,  role_list):

     '''

     生成token

     :param api_user:用户id

     :return: token

     '''

     # 第一个参数是内部的私钥,这里写在共用的配置信息里了,如果只是测试可以写死

     # 第二个参数是有效期()

     s = Serializer(app.config['SECRET_KEY'],  expires_in=app.config['EXPIRES_IN'])

     # 接收用户id转换与编码

     token = None

     try:

         token = s.dumps({"id": user_id, "name": user_name,  "role": role_list}).decode("ascii")

     except Exception as e:

         app.logger.error("获取token失败:{}".format(e))

     return token

校验token方法:

def verify_token(token):

     '''

     校验token

     :param token:

     :return: 用户信息 or None

     '''

     # 参数为私有秘钥,跟上面方法的秘钥保持一致

     s = Serializer(app.config['SECRET_KEY'])

     try:

         # 转换为字典

         data = s.loads(token)

         return data

     except Exception as e:

         app.logger.error(f"token转换失败:{e}")

         return None

注销登录,接收到token后作对比,成功后将redistoken删除。

@user.route('/logout',  methods=["POST"])

@login_required()

def logout():

     '''

     注销方法:redis删除token

     :return:

     '''

     try:

         token = request.headers["Authorization"]

         user = verify_token(token)

         if user:

            key =  f"token_{user.get('name')}"

            redis_token = Redis.read(key)

            if redis_token:

                Redis.delete(key)

            return SUCCESS()

         else:

             return AUTH_ERR()

     except Exception as e:

         app.logger.error(f"注销失败")

         return REQUEST_ERROR()

检查是否登录:

@user.route('/check_token',  methods=["POST"])

def check_token():

     # 在请求头上拿到token

     token = request.headers["Authorization"]

     user = verify_token(token)

     if user:

         key = f"token_{user.get('name')}"

         redis_token = Redis.read(key)

         if redis_token == token:

            return  SUCCESS(data=user.get('id'))

         else:

            return OTHER_LOGIN()

     else:

         return AUTH_ERR()

最后是需要做一个登录拦截器,意思是在客户端访问某些接口时,需要先进行登录验证,通过以后才能正常访问。

def login_required(*role):

     def decorator(func):

         @functools.wraps(func)

         def wrapper(*args, **kw):

            try:

                # 在请求头上拿到token

                 token =  request.headers["Authorization"]

            except Exception as e:

                # 没接收的到token,给前端抛出错误

                return  jsonify(code=Code.NO_PARAMETER.value, msg='缺少参数token')

            s =  Serializer(app.config['SECRET_KEY'])

            try:

                user = s.loads(token)

                if role:

                    # 获取token中的权限列表如果在参数列表中则表示有权限,否则就表示没有权限

                    user_role = user['role']

                    result = [x for x in  user_role if x in list(role)]

                    if not result:

                        return  jsonify(code=Code.ERR_PERMISSOM.value, msg="权限不够")

            except Exception as e:

                return  jsonify(code=Code.LOGIN_TIMEOUT.value, msg="登录已过期")

            return func(*args, **kw)

         return wrapper

     return decorator

只需要在需要的接口下面加上@login_required()即可。

例如:

@home.route('/')

@login_required()

def test():

     return jsonify({

         'code': 0,

         'msg': 'test',

     })


END

实习编辑   |   王文星

责       编   |   刘玉江

能力越强,责任越大。实事求是,严谨细致。    

                                                  ——where2go 团队


微信号:算法与编程之美          

    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多