最近做一个小程序开发任务,主要负责后台部分开发;根据项目需求,需要实现三个定时任务: 1>定时更新微信token,需要2小时更新一次; 2>商品定时上线; 3>定时检测后台服务是否存活; 使用Python去实现这三个任务,这里需要使用定时相关知识点; Python实现定点与定时任务方式比较多,找到下面四中实现方式,每个方式都有自己应用场景;下面来快速介绍Python中常用的定时任务实现方式: 1.循环+sleep; 2.线程模块中Timer类; 3.schedule模块; 4.定时框架:APScheduler 在开始之前先设定一个任务(这样不用依赖外部环境): 1:定时或者定点监测CPU与内存使用率; 2:将时间,CPU,内存使用情况保存到日志文件; 先来实现系统监测功能: 准备工作:安装psutil:pip install psutil 功能实现 #psutil:获取系统信息模块,可以获取CPU,内存,磁盘等的使用情况 import psutil import time import datetime #logfile:监测信息写入文件 def MonitorSystem(logfile = None): #获取cpu使用情况 cpuper = psutil.cpu_percent() #获取内存使用情况:系统内存大小,使用内存,有效内存,内存使用率 mem = psutil.virtual_memory() #内存使用率 memper = mem.percent #获取当前时间 now = datetime.datetime.now() ts = now.strftime('%Y-%m-%d %H:%M:%S') line = f'{ts} cpu:{cpuper}%, mem:{memper}%' print(line) if logfile: logfile.write(line) 代码运行结果: 2019-03-21 14:23:41 cpu:0.6%, mem:77.2% 接下来我们要实现定时监测,比如3s监测一下系统资源使用情况。 最简单使用方式:sleep 这种方式最简单,直接使用while+sleep就可以实现: def loopMonitor(): while True: MonitorSystem() #2s检查一次 time.sleep(3) loopMonitor() 输出结果: 2019-03-21 14:28:42 cpu:1.5%, mem:77.6% 这种方式存在问题:只能处理单个定时任务。 又来了新任务:需要每秒监测网络收发字节,代码实现如下: def MonitorNetWork(logfile = None): #获取网络收信息 netinfo = psutil.net_io_counters() #获取当前时间 now = datetime.datetime.now() ts = now.strftime('%Y-%m-%d %H:%M:%S') line = f'{ts} bytessent={netinfo.bytes_sent}, bytesrecv={netinfo.bytes_recv}' print(line) if logfile: logfile.write(line) MonitorNetWork() 代码执行结果: 2019-03-21 14:47:21 bytessent=169752183, bytesrecv=1107900973 如果我们同时在while循环中监测两个任务会有等待问题,不能每秒监测网络情况。 Timer实现方式 timer最基本理解就是定时器,我们可以启动多个定时任务,这些定时器任务是异步执行,所以不存在等待顺序执行问题。 先来看Timer的基本使用: 导入:from threading import Timer 主要方法: Timer方法说明Timer(interval, function, args=None, kwargs=None)创建定时器cancel()取消定时器start()使用线程方式执行join(self, timeout=None)等待线程执行结束 定时器只能执行一次,如果需要重复执行,需要重新添加任务; 我们先来看基本使用: from threading import Timer #记录当前时间 print(datetime.datetime.now()) #3S执行一次 sTimer = Timer(3, MonitorSystem) #1S执行一次 nTimer = Timer(1, MonitorNetWork) #使用线程方式执行 sTimer.start() nTimer.start() #等待结束 sTimer.join() nTimer.join() #记录结束时间 print(datetime.datetime.now()) 输出结果: 2019-03-21 15:13:36.739798 可以看到,花费时间为3S,但是我们想要做的是每秒监控网络状态;如何处理。 Timer只能执行一次,所以执行完成之后需要再次添加任务,我们对代码进行修改: from threading import Timer 执行结果: 2019-03-21 15:18:21 cpu:1.5%, mem:93.2% 2019-03-21 15:18:21 bytessent=171376522, bytesrecv=1109124678 2019-03-21 15:18:22 bytessent=171382215, bytesrecv=1109128294 2019-03-21 15:18:23 bytessent=171384278, bytesrecv=1109129702 2019-03-21 15:18:24 cpu:1.9%, mem:93.2% 2019-03-21 15:18:24 bytessent=171386341, bytesrecv=1109131110 2019-03-21 15:18:25 bytessent=171388527, bytesrecv=1109132600 2019-03-21 15:18:26 bytessent=171390590, bytesrecv=1109134008 从时间中可以看到,这两个任务可以同时进行不存在等待问题。 Timer的实质是使用线程方式去执行任务,每次执行完后会销毁,所以不必担心资源问题。 调度模块:schedule schedule是一个第三方轻量级的任务调度模块,可以按照秒,分,小时,日期或者自定义事件执行时间; 安装方式: pip install schedule 我们来看一个例子: import datetime 执行结果: do func time : 2019-03-22 08:51:38 do func2 time: 2019-03-22 08:51:39 do func time : 2019-03-22 08:51:39 do func time : 2019-03-22 08:51:40 do func2 time: 2019-03-22 08:51:41 do func time : 2019-03-22 08:51:41 do func time : 2019-03-22 08:51:42 do func2 time: 2019-03-22 08:51:43 do func time : 2019-03-22 08:51:43 do func time : 2019-03-22 08:51:44 do func2 time: 2019-03-22 08:51:45 do func time : 2019-03-22 08:51:45 do func time : 2019-03-22 08:51:46 执行过程分析: >1>因为在jupyter下执行,所以先将schedule任务清空; 第5个顺序执行怎么理解,我们修改func函数,里面添加time.sleep(2) 然后只执行func工作,输出结果: do func time : 2019-03-22 09:00:59 do func time : 2019-03-22 09:01:02 do func time : 2019-03-22 09:01:05 可以看到时间间隔为3S,为什么不是1S? 因为这个按照顺序执行,func休眠2S,循环任务查询休眠1S,所以会存在这个问题。 在我们使用这种方式执行任务需要注意这种阻塞现象。 我们看下schedule模块常用使用方法: #schedule.every(1)创建Job, seconds.do(func)按秒间隔查询并执行 这种方式局限性:如果工作任务回非常耗时就会影响其他任务执行。我们可以考虑使用并发机制配置这个模块使用。 任务框架APScheduler APScheduler是Python的一个定时任务框架,用于执行周期或者定时任务, 可以基于日期、时间间隔,及类似于Linux上的定时任务crontab类型的定时任务; 该该框架不仅可以添加、删除定时任务,还可以将任务存储到数据库中,实现任务的持久化,使用起来非常方便。 安装方式:pip install apscheduler apscheduler组件及简单说明:
来看一个简单例子: import time from apscheduler.schedulers.blocking import BlockingScheduler def func(): now = datetime.datetime.now() ts = now.strftime('%Y-%m-%d %H:%M:%S') print('do func time :',ts) def func2(): #耗时2S now = datetime.datetime.now() ts = now.strftime('%Y-%m-%d %H:%M:%S') print('do func2 time:',ts) time.sleep(2) def dojob(): #创建调度器:BlockingScheduler scheduler = BlockingScheduler() #添加任务,时间间隔2S scheduler.add_job(func, 'interval', seconds=2, id='test_job1') #添加任务,时间间隔5S scheduler.add_job(func2, 'interval', seconds=3, id='test_job2') scheduler.start() dojob() 输出结果: do func time : 2019-03-22 10:32:20 输出结果中可以看到:任务就算是有延时,也不会影响其他任务执行。 APScheduler框架提供丰富接口去实现定时任务,可以去参考官方文档去查看使用方式。 最后选择: 简单总结上面四种定时定点任务实现: 1:循环+sleep方式适合简答测试, 2:timer可以实现定时任务,但是对定点任务来说,需要检查当前时间点; 3:schedule可以定点定时执行,但是需要在循环中检测任务,而且存在阻塞; 4:APScheduler框架更加强大,可以直接在里面添加定点与定时任务; 综合考虑,决定使用APScheduler框架,实现简单,只需要直接创建任务,并将添加到调度器中即可。 |
|
来自: 千锋Python学堂 > 《Python基础教程分享》