分享

[Python网络编程]浅析守护进程后台任务的设计与实现

 心不留意外尘 2016-08-26

http://blog.csdn.net/yueguanghaidao/article/details/35572811

2014

在做基于B/S应用中,经常有需要后台运行任务的需求,最简单比如发送邮件。在一些如防火墙,WAF等项目中,前台只是为了展示内容与各种参数配置,后台守护进程才是重头戏。所以在防火墙配置页面中可能会经常看到调用cgi,但真正做事的一般并不是cgi,比如说执行关机命令,他们的逻辑如下:


   (ps:上图所说的前台界面包含通常web开发中的后端,不然也没有socket一说)


    为什么要这么设计

你可能疑惑为什么要这么设计,我觉得理由如下:
首先有一点说明,像防火墙等基本上都运行在类Linux平台上
    1.安全问题  cgi一般也就拥有www权限,但执行关键等命令需要root,所以需要让后台守护进程去干
    2.一般类似防火墙的后台守护进程是C/C++写的,在消息格式上很方便处理,如填充一个消息结构体发送出去,后台进程只需要强制转换为定义的结构体,就轻松获得传递的参数值。

那可不可以去掉中间的cig模块,直接发送消息给后台守护进程呢?
我觉得是可以的,本文的重点也是实现这个方案。

如何实现

由于最近一直在windows下,所以我们的守护进程是运行在windows下的,但其实windows并没有守护进程的概念,相对应的是服务的概念。这里需要安装pywin32包。
[python] view plain copy
  1. class MgrService(win32serviceutil.ServiceFramework):   
  2.     """ 
  3.     Usage: 'python topmgr.py install|remove|start|stop|restart' 
  4.     """  
  5.     #服务名  
  6.     _svc_name_ = "Mgr"  
  7.     #服务显示名称  
  8.     _svc_display_name_ = "Daemon Mgr"  
  9.     #服务描述  
  10.     _svc_description_ = "Daemon Mgr"  
  11.   
  12.     def __init__(self, args):   
  13.         win32serviceutil.ServiceFramework.__init__(self, args)   
  14.         self.hWaitStop = win32event.CreateEvent(None, 0, 0, None)  
  15.   
  16.     def SvcDoRun(self):  
  17.         self.ReportServiceStatus(win32service.SERVICE_START_PENDING)  
  18.         INFO("mgr startting...")  
  19.         self.ReportServiceStatus(win32service.SERVICE_RUNNING)  
  20.         self.start()  
  21.         # 等待服务被停止  
  22.         INFO("mgr waitting...")  
  23.         win32event.WaitForSingleObject(self.hWaitStop, win32event.INFINITE)  
  24.         INFO("mgr end")  
  25.           
  26.     def SvcStop(self):   
  27.         self.ReportServiceStatus(win32service.SERVICE_STOP_PENDING)  
  28.         INFO("mgr stopping...")  
  29.         self.stop()  
  30.         INFO("mgr stopped")  
  31.         # 设置事件  
  32.         win32event.SetEvent(self.hWaitStop)  
  33.         self.ReportServiceStatus(win32service.SERVICE_STOPPED)  
  34.   
  35.     def start(self): pass  
  36.   
  37.     def stop(self): pass  
很简单,这样就实现了windows中的服务,也就是说脱离终端,运行于后台。INFO等函数只是简单的记录作用,可直接忽略。
我们要实现自己的后台程序,只需要继承MgrService,并提供start,stop方法就可以了。

由于我们是通过socket来传递消息的,所以在start方法中要监听端口,等待连接,处理连接,这个大家都很擅长。在这里我选择了
 单线程,基于协程,底层使用libev(libevent)--- gevent这个高性能网络库。对gevent有兴趣的童鞋可以看看深度分析gevent运行流程
[python] view plain copy
  1. class Engine(MgrService):  
  2.     rbufsize = -1  
  3.     wbufsize = 0  
  4.   
  5.     def start(self):  
  6.         INFO('wait connection')  
  7.         self.server = StreamServer((HOST, PORT), self.msg_handle)  
  8.         self.server.serve_forever()  
  9.   
  10.     def msg_handle(self,socket,address):  
  11.         try:  
  12.             rfile = socket.makefile('rb', self.rbufsize)  
  13.             wfile = socket.makefile('wb', self.wbufsize)  
  14.             headers = Message(rfile).dict  
  15.   
  16.             INFO('get a connection from:%s,headers:%s' % (str(address), headers))  
  17.   
  18.             if 'module' in headers and headers['module'] in MODULES:  
  19.                 MODULES[headers['module']].handle(wfile, headers)  
  20.         except Exception:  
  21.             ERROR('msg_handle exception,please check')  
  22.   
  23.     def stop(self):  
  24.         if hasattr(self, server):  
  25.             self.server.stop()  
当有新连接到来,由msg_handle处理,首先读取发送来的消息,消息格式使用了最简单的http的格式,即(键名:键值)的格式,你要问我为什么采用这个格式,哈哈,格式简单,python有现成的库解析。

考虑到后期模块可能很多,所以我们的处理流程自动根据消息的模块参数,调用对应模块的handle方法。
上面代码的那个MODULES是个全局变量,当你添加一个模块的时候需要注册到MODULES中,我提供了module_register方法。
[python] view plain copy
  1. MODULES = {           # module: handle module class  
  2. }  
  3.   
  4. def module_register(module_name, handle_class):  
  5.     if module_name in MODULES:  
  6.         WARN('duplicate module_name:' + module_name)  
  7.     else:  
  8.         MODULES[module_name] = handle_class  

到这里一切都很自然,但貌似只假设模块有handle方法,自己写一个模块还是很费事,你需要自己去想怎么调用,最有返回什么格式的数据,这都是一件头疼的事情,所以最好提供一个基类模块。
[python] view plain copy
  1. class Module(object):  
  2.     SECRE_KEY = "YI-LUO-KEHAN"  
  3.     MODULE_NAME = "BASE_MODULE"  
  4.     PREFIX = "do_"  # method prefix  
  5.   
  6.     def __init__(self, wfile, headers):  
  7.         self.wfile = wfile  
  8.         self.headers = headers  
  9.   
  10.     def __getattr__(self, name):  
  11.         try:  
  12.             return self.headers[name]  
  13.         except Exception:  
  14.             ERROR("%s has no attr:%s,please check" %(self.MODULE_NAME, name))              
  15.  
  16.     @classmethod  
  17.     def handle(cls, wfile, headers):  
  18.         module_obj = cls(wfile, headers)  
  19.         module_obj.schedule_default()  
  20.   
  21.     def verify(self):  
  22.         if hmac.new(self.SECRE_KEY, self.MODULE_NAME).hexdigest() == self.signature:  
  23.             return True  
  24.         else:  
  25.             WARN("client verify failed,signature:%s" % str(self.signature))  
  26.   
  27.     def schedule_default(self):  
  28.         err_code = 0  
  29.         if self.verify() and self.action:  
  30.             func_name = self.PREFIX + self.action  
  31.             try:  
  32.                 getattr(self, func_name)()  
  33.             except AttributeError:  
  34.                 err_code = 1  
  35.                 ERROR("%s has no method:%s" %(self.MODULE_NAME, func_name))  
  36.             except Exception:  
  37.                 err_code = 2  
  38.                 ERROR("module:%s,method:%s,exception" % (self.MODULE_NAME, func_name))                
  39.         else:  
  40.             err_code = 3  
  41.   
  42.         if err_code:  
  43.             self.send_error({'err_code':err_code})  
  44.   
  45.     def send_success(self, msg=''):  
  46.         data = {'success':True,'msg':msg}  
  47.         self.wfile.write(json.dumps(data))  
  48.   
  49.     def send_error(self, msg=''):  
  50.         data = {'success':False,'msg':msg}  
  51.         self.wfile.write(json.dumps(data))  

在基类模块中我们提供了默认的处理流程,即根据消息中action,调用do_action方法,并提供了一个简单但很有效的认证方法,通过消息的signature字段,可能有些简陋,但没关系,你可以定义自己的认证方法。

下面该写我们自己的模块了,
[python] view plain copy
  1. TASK = {}  # task_id: pid  
  2. class ScanModule(Module):  
  3.     MODULE_NAME = "SCAN_MODULE"  
  4.   
  5.     def do_start(self):  
  6.         self.send_success('start ok')  
  7.         DEBUG('------------task start------------')  
  8.         task_ids = [int(task_id) for task_id in self.task_ids.split(',') if int(task_id) not in TASK]  
  9.   
  10.         for task_id in task_ids:  
  11.             try:  
  12.                 cmd = 'python scan.py -t %s' % task_id  
  13.                 DEBUG(cmd)  
  14.                 self.sub = Popen(cmd, shell=True, cwd=CWD)  
  15.                 pid = int(self.sub.pid)  
  16.                 TASK[task_id] = pid  
  17.                 INFO('%s start a new task,task_id:%s,pid:%s' %(self.MODULE_NAME, task_id, pid))  
  18.             except Exception:  
  19.                 ERROR('%s start a new task,task_id:%s failed' % (self.MODULE_NAME, task_id))  
  20.   
  21.     def do_stop(self):  
  22.         self.send_success('stop ok')  
  23.         DEBUG('------------task stop------------')  
  24.         task_ids = [int(task_id) for task_id in self.task_ids.split(',') if int(task_id) in TASK]  
  25.   
  26.         for task_id in task_ids:  
  27.             pid = TASK.pop(task_id)  
  28.             try:  
  29.                 INFO('%s stop a new task,task_id:%s,pid:%s' %(self.MODULE_NAME, task_id, pid))  
  30.                 call(['taskkill', '/F', '/T', '/PID', str(pid)])  
  31.             except Exception:  
  32.                 ERROR('%s taskkill a task failed,task_id:%s,pid:%s' %(self.MODULE_NAME, task_id, pid))  
  33.   
  34.   
  35. module_register(ScanModule.MODULE_NAME, ScanModule)  
上面实现了一个简单的扫描模块,支持两个action,start,stop。
start很简单,调用gevent的subprocess.Popen运行子进程,并记录pid,stop则使用taskkill直接杀掉该进程。
这里有两点需要注意:
    1.不要用原生的subprocess模块,因为原生的subprocess是阻塞的,这可能导致主处理逻辑也阻塞,不能服务更多的请求
最后别忘了调用module_register注册相应模块。
    2.方法一开始最好就返回结果,因为前台很可能在等待返回。所以说as soon as possible

下面提供一个客户端用于测试,client.py
[python] view plain copy
  1. #!/usr/bin/env python  
  2. #-*-encoding:UTF-8-*-  
  3.   
  4. import hmac  
  5. import gevent  
  6. from gevent import monkey  
  7. monkey.patch_socket()  
  8.   
  9. addr = ('localhost', 6667)  
  10.   
  11.   
  12. def send_request(module_name,request_headers):  
  13.     SECRE_KEY = "YI-LUO-KEHAN"  
  14.     socket = gevent.socket.socket()  
  15.     socket.connect(addr)  
  16.     request_headers['module'] = module_name  
  17.     request_headers['signature'] = hmac.new(SECRE_KEY, module_name).hexdigest()  
  18.     h = ["%s:%s" %(k, v) for k,v in request_headers.iteritems()]  
  19.     h.append('\n')  
  20.     request = '\n'.join(h)  
  21.     socket.send(request)  
  22.     print socket.recv(8192)  
  23.     socket.close()  
  24.   
  25. if __name__ =="__main__":  
  26.     import sys  
  27.     if sys.argv[1] == 'start':  
  28.         send_request('SCAN_MODULE',{'action':'start','task_ids':'1'})  
  29.     else:  
  30.         send_request('SCAN_MODULE',{'action':'stop','task_ids':'1'})  
  31.   
  32.       
  33.       

我们来简单的测试一下:
注意:由于要注册到服务,cmd需要管理员权限
至于start中调用的scan.py随便写一个就可以

截图如下,我们看到成功!!!


本文代码已放到github,https://github.com/Skycrab/pymgr
感兴趣的童鞋可以参考,请大家多提意见。




    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约