celery异步异步任务处理
一、celery简介
celery 官方文档英文版:http://docs./en/latest/index.html

组件
-
任务Tasks:用户需要实现的功能。分为异步任务和定时任务。
-
中间人Broker:任务队列,存放任务的地方,worker执行单元获取任务的地方。中间人采用Redis或者是RabbitMQ。
-
执行者Worker:监听任务队列里面是否有任务,有就处理任务
-
存储Backend:把执行Tasks返回的结果进行存储
二、应用场景
异步任务:将耗时操作任务提交给Celery去异步执行,比如发送短信/邮件、消息推送、音视频处理等
定时任务:定时执行某件事情
三、安装和配置
-
导入模块
pip install celery
-
配置django
-
在项目根目录下面创建一个包celery_tasks
-
在创建的celery_tasks包里面创建一个config.py用来专门配置我们需要的信息或者是在settings里面配置,然后在调用。使用redis来作为中间人
broker_url = "redis://127.0.0.1/15" # 任务队列存储的地方
result_backend = "redis://127.0.0.1/14" # 任务执行结果存储的地方
-
在 celery_tasks 包里面创建一个文件夹用来放需要执行的任务:里面放任务的处理流程
-
在celery_tasks包里面创建一个main.py文件,用来作为celery的client启动文件
import os
from celery import Celery
# 为celery使用django配置文件进行设置
if not os.getenv('DJANGO_SETTINGS_MODULE'):
os.environ['DJANGO_SETTINGS_MODULE'] = '项目名.settings'
# 创建celery实例
app = Celery('任务名') # 自己创建一个
# 导入celery配置
app.config_from_object('celery_tasks.config')
# 自动注册celery任务
app.autodiscover_tasks(['celery_tasks.任务文件名'])
-
使用 celery 开启服务
celery -A celery_tasks.main worker -l info
# -A 选项指定 celery 实例 app 的位置
# -l 选项指定日志级别, -l 是 --loglevel 的缩略形式
四、使用celery异步完善aly-blog中短信发送
分析:
- 创建一个celery的服务器
- 创建一个send_sms的任务
- 在数据保存到redis中后调用delay进行异步发送短信
1. celery_tasks/main.py
# -*- coding: utf-8 -*-
from celery import Celery
# 为celery使用django配置文件进行设置
import os
if not os.getenv('DJANGO_SETTINGS_MODULE'):
os.environ['DJANGO_SETTINGS_MODULE'] = 'alyBlog.settings'
# 创建celery应用/实例
app = Celery('aly-blog')
# 导入celery配置
app.config_from_object('celery_tasks.config')
# 自动注册celery任务
app.autodiscover_tasks(['celery_tasks.sms'])
2. celery_tasks/config.py
# -*- coding: utf-8 -*-
broker_url = "redis://127.0.0.1/15"
result_backend = "redis://127.0.0.1/14"
3. celery_tasks/sms/tasks.py
import logging
from celery_tasks.main import app
from utils.yuntongxun.sms import CCP
logger = logging.getLogger("django")
@app.task(name='send_sms_code')
def send_sms_code(mobile, sms_num, expires, temp_id):
"""
发送短信验证码
:param mobile: 手机号
:param sms_num: 验证码
:param expires: 有效期
:return: None
"""
try:
result = CCP().send_Template_sms(mobile, [sms_num, expires], temp_id)
except Exception as e:
logger.error("发送验证码短信[异常][ mobile: %s, message: %s ]" % (mobile, e))
else:
if result == 0:
logger.info("发送验证码短信[正常][ mobile: %s sms_code: %s]" % (mobile, sms_num))
else:
logger.warning("发送验证码短信[失败][ mobile: %s ]" % mobile)
4. users/verifications/views.py
from celery_tasks.sms.tasks import send_sms_code
class SmsCodeView(View):
"""
# 1. 创建一个SmsCodeView类
param: mobile、image_text、image_code_id
"""
# 2. 创建一个post方法用来处理逻辑
def post(self, request):
# 3. 获取前端传来的数据
json_data = request.body
# 4. 将数据转化为字典
dict_data = json.loads(json_data)
# 5. 将数据传递给SmsCodeForm表单进行校验
form = SmsCodeForm(data=dict_data)
# 6. 校验成功处理方式
if form.is_valid():
# 7. 获取校验后的数据
mobile = form.cleaned_data.get("mobile")
# 8. 生成短信验证码
sms_text = "%06d" % random.randint(0, 999999)
# 9. 将短信验证码和和过期时间保存到redis中
redis_obj = get_redis_connection("verify_code")
sms_text_key = "sms_code_{}".format(mobile).encode("utf8")
sms_repeat_key = "sms_sixty_{}".format(mobile).encode("utf8")
redis_obj.setex(sms_text_key, contains.SMS_CODE_EXPIRE_TIME, sms_text) # key, expire_time, value
redis_obj.setex(sms_repeat_key, contains.SMS_CODE_EXPIRE_TIME, contains.SMS_REPEAT_EXPIRE_TIME)
# logger.info("发送短信正常[mobile:%s sms_num:%s]" % (mobile, sms_text)) # 调试代码时候用,在控制台显示
# print(sms_text)
# return to_json_data(errmsg="短信发送成功") # 短信调试
# # 9. 使用用通讯插件发送短信
# try:
# result = CCP().send_Template_sms(mobile, [sms_text, contains.SMS_CCP_EXPIRE_TIME], contains.SMS_TEMPLATE)
# except Exception as e:
# logger.error("短信发送异常[mobile:{},error:{}]".format(mobile, e))
# return to_json_data(errno=Code.SMSERROR, errmsg=error_map[Code.SMSERROR]) # 短信发送异常
# else:
# if result == 0: # 发送成功
# logger.info("短信发送成功[mobile:{},sms_code:{}]".format(mobile, sms_text))
# return to_json_data(errmsg="短信发送正常")
# else: # 发送失败
# logger.warning("短信发送失败[mobile:{}]".format(mobile))
# return to_json_data(errno=Code.SMSFAIL, errmsg=error_map[Code.SMSFAIL])
# 使用celery异步处理短信发动任务 修改的就这里一处
expires = 300
send_sms_code.delay(mobile,sms_text,expires,contains.SMS_TEMPLATE)
return to_json_data(errno=Code.OK,errmsg="短信验证码发送成功")
# 校验未通过
else:
err_msg_list = []
for item in form.errors.values():
err_msg_list.append(item[0])
err_info = '/'.join(err_msg_list)
return to_json_data(errno=Code.PARAMERR, errmsg=err_info)
五、 使用celery定时任务完成邮箱验证
这里采用网易云163邮箱进行发送
分析:
- 当用户注册完成后,在跳转到登录界面之间插入一个邮箱验证,用来完成激活
- 当邮件发送到客户邮箱中后,过期时间设定为60min,
- 当用户点击时将Users模型中的is_active设置为True,同时跳转到登录界面
main任务执行方法和config配置方法不变
1. settings配置
# 配置邮箱
EMAIL_BACKEND = 'django.core.mail.backends.smtp.EmailBackend'
EMAIL_USE_SSL = True
EMAIL_HOST = 'smtp.163.com' # SMTP服务器: smtp.163.com
EMAIL_PORT = 465 # 端口
EMAIL_FROM = '邮箱验证<优先>' # 收件人看到的发件人
3. celery_tasks/celery_email/tasks.py
# -*- coding: utf-8 -*-
"""
@Time : 2020/3/23 21:21
@Author : 半纸梁
@File : tasks.py
"""
from celery_tasks.main import app
from django.core.mail import send_mail
from alyBlog import settings
from itsdangerous import TimedJSONWebSignatureSerializer as Serializer
from utils.user_config import user_config
@app.task(name="send_verify_email")
def send_verify_email(username, to_email, rank):
# send_mail(subject, message, from_email, recipient_list,
# fail_silently=False, auth_user=None, auth_password=None,
# connection=None, html_message=None):
auth_user = user_config.EMAIL_HOST_USER
auth_password = user_config.EMAIL_HOST_PASSWORD
# 创建一个JWT的对象,以SECRET_KEY为加密的参数,过期时间为1h
serializer_obj = Serializer(secret_key=settings.SECRET_KEY, expires_in=3600)
user_info = {"username": username} # 加密的数据dict
user_info = serializer_obj.dumps(user_info) # 加密
token = user_info.decode() # 转码为字符串
subject = '新用户激活邮箱验证'
message = '测试信息'
from_email = settings.EMAIL_FROM
recipient_list = [to_email]
host = settings.SERVER_DOMAIN
html_message = '<h1>欢迎成为博客的第{}位读者</h1>请点击下面链接激活您的账户:<br/> <a href="{}/users/active/{}">{}/users/active/{}</a>'.format(rank, host, token, host, token)
send_mail(subject, message, from_email, recipient_list, auth_user=auth_user, auth_password=auth_password,
html_message=html_message, fail_silently=False)
4. users/views.py
class EmailVerifyView(View):
"""
用户邮箱验证
"""
def get(self,request,token):
serializer_obj = Serializer(settings.SECRET_KEY,3600)
try:
username = serializer_obj.loads(token)
except SignatureExpired:
return HttpResponse("链接已过期")
username = username.decode()
user = models.Users.objects.only("is_active").filter(username=username).first()
user.is_active = True
user.save(update_fields=["is_active"])
return render(request,'users/login.html')
def test(request):
"""测试email激活使用"""
username = "xxx"
to_email = "接收者邮箱"
send_verify_email.delay(username, to_email, 1)
return HttpResponse("邮件已发送")
六、 使用celery 异步任务完成图片上传
1. 使用fastdfs上传到服务器
分析:
- 只需要在将上传图片哪一步放到celery异步任务队列里面就行了
2. 使用百度BOS上传到百度BOS里面
分析:
- 构建一个BOS图片上传对象
- 通过append_object_from_string进行上传图片
3. celery_tasks/upload_images/tasks.py
# -*- coding: utf-8 -*-
"""
@Time : 2020/3/23 18:00
@Author : 半纸梁
@File : tasks.py
"""
from celery_tasks.main import app
from utils.fast.fdfs import client
from utils.upload_image.bd_upload_image import BdUploadImage
@app.task(name='upload_server_images')
def upload_server_images(data, file_ext_name):
"""
图片上传到服务器
:param data: 图片二进制数据
:param file_ext_name: 图片后缀名
:return:
"""
client.upload_by_buffer(data, file_ext_name)
@app.task(name="upload_bos_images")
def upload_bos_images(bos_host, ak, sk, bucket, image_name, data):
"""
:param bos_host: bos域名
:param ak: 百度云access_key_id
:param sk: 百度云secret_access_key
:param bucket: 百度云总图片存储库名
:param image_name: 文件名
:param data: 图片二进制数据
:return:
"""
upload_image = BdUploadImage(bos_host, ak, sk)
res = upload_image.upload(bucket, image_name, data)
return res
bd_upload_image.py
百度云上传图片封装函数,有疑问可以查看百度云BOS存储图片SDK
# -*- coding: utf-8 -*-
"""
@Time : 2020/3/17 17:29
@Author : 半纸梁
@File : bd_upload_image.py
"""
from baidubce.services.bos.bos_client import BosClient
from baidubce.auth.bce_credentials import BceCredentials
from baidubce.bce_client_configuration import BceClientConfiguration
from user_config import user_config
class BdUploadImage(object):
"""
百度云BOS存储图片插件
"""
def __init__(self, bos_host, access_key_id, secret_access_key):
self.bos_host = bos_host
self.config = BceClientConfiguration(credentials=BceCredentials(access_key_id, secret_access_key),endpoint=bos_host)
self.client = BosClient(config=self.config)
def upload(self, bucket, key, data):
"""
图片存储库名、文件名、图片二进制数据
:param bucket:图片存储库名
:param key:文件名
:param data:图片二进制数据
:return:
"""
res = self.client.append_object_from_string(bucket_name=bucket, key=key, data=data)
return res
if __name__ == '__main__':
"""测试内容"""
bos_host = 'https://bj.'
image_name = "测试"
bucket = "banzhiliang"
with open("2018.png", "rb") as f:
data = f.read()
upload_image = BdUploadImage(bos_host, user_config.access_key_id, user_config.secret_access_key)
upload_image.upload(bucket, image_name, data)
|