分享

400行python 教你写个高性能 http服务器+web框架,性能秒胜tornado django webpy uwsgi

 java_laq小馆 2014-09-27
2014-05-20
web框架http服务器tornadodjangouwsgi
tornado 4kqps 多进程 1w
nginx+tornado 9kqps
nginx+uwsgi 8kqps (注意:没说比nginx快,只是这几个web框架不行) 
本server 3.2w qps 
没用任何python加速

不相信的可以自己压测下哦
已经真实使用到自己的多个项目之中,效果明显
有需要优化的地方或者建议欢迎联系 qq 512284622

简单说下原理 多进程+多线程的 半同步半异步模式
相信很多朋友看过memcache libevent的会问:
为什么不一个线程负责监听,另外线程负责读写,中间再用管道进行通信,
这是因为:
1、epoll 的所有事件包括 add modify 和wait 都是线程安全的,可以多个线程同时对一个epoll进行操作,
所以这里我们可以业务线程处理完后立即modify epoll事件,而不用通过队列管道等,都放到一个线程去操作epoll事件
2、linux现在的epoll 已经解决惊群问题,可以多个进程多个epoll同时对一个sock进行epollwait,
而不用只用一个进程或线程去单纯负责监听
因此现在不需要libevent 那种一个线程负责监听 另外线程负责处理读写,中间通过管道交互的复杂方式,代码简洁。
另外感谢python强大库 所以才能做到400多行完成一个异步高性能http服务器。

用户文档:

1、启动:
指定监听端口即可启动
python server.py 8992
2、快速编写cgi,支持运行时修改,无需重启server
在server.py同一目录下
随便建一个python 文件
例如:
example.py
定义一个tt函数:
则请求该函数的url为 http://ip:port/example.tt
修改后保存,即可访问,无需重启
example.py
def tt(request,response_head):
#print request.form
#print request.getdic
#print request.postdic
#itemname = request.form["file"].filename
#value = request.form["file"].file.read()
return "ccb"+request.path

函数必须带两个参数
request:表示请求的数据 默认带以下属性
headers: 头部 (字典)
form:  multipart/form表单 (字典)
getdic: url参数 (字典)
postdic: httpbody参数 (字典)
rfile: 原始http content内容  (字符串)
action: python文件名 (这里为example)
method: 函数方法    (这里为tt)
command:  (get or post)
path: url (字符串)
http_version: http版本号 (http 1.1)
response_head: 表示response内容的头部
例如如果要返回用gzip压缩
则增加头部
response_head["Content-Encoding"] = "gzip"

3、下载文件
默认静态文件放在static文件夹下
例如把a.jpg放到static文件夹下
访问的url为 http://ip:port/static/a.jpg
支持etag 客户端缓存功能
支持range 支持断点续传
(server 使用sendfile进行文件发送,不占内存且快速)

4、支持网页模板编写
创建一个模板 template.html
<HTML>
<HEAD><TITLE>$title</TITLE></HEAD>
<BODY>
$contents
</BODY>
</HTML>

则对应的函数:
def template(request,response_head):
t = Template(file="template.html")
t.title  = "my title"
t.contents  = "my contents"
return str(t)
模板实现使用了python最快速Cheetah开源模板,
性能约为webpy django thinkphp等模板的10倍以上:

http://my.oschina.net/whp/blog/112296

example.py
import os
import imp
import sys
import time
import gzip
from StringIO import StringIO
from Cheetah.Template import Template
def tt(request,response_head):
#print request.form
#print request.getdic
#print request.postdic
return "ccb"+request.path
def getdata(request,response_head):
f=open("a.txt")
content = f.read()
f.close()
response_head["Content-Encoding"] = "gzip"
return content
def template(request,response_head):
t = Template(file="template.html")
t.title  = "my title"
t.contents  = "my contents"
response_head["Content-Encoding"] = "gzip"

return str(t)

[python]view plaincopy
  1. #!/usr/bin/python  

  2. #-*- coding:utf-8 -*-  

  3. # Author      : hemingzhe <512284622@qq.com>; xiaorixin<xiaorx@live.com>  


  4. import socket, logging  

  5. import select, errno  

  6. import os  

  7. import sys  

  8. import traceback  

  9. import gzip  

  10. from StringIO import StringIO  

  11. import Queue  

  12. import threading  

  13. import time  

  14. import thread  

  15. import cgi  

  16. from cgi import parse_qs  

  17. import json  

  18. import imp  

  19. from os.path import join, getsize  

  20. import md5  

  21. import re  


  22. logger = logging.getLogger("network-server")  

  23. action_dic = {}  

  24. action_time = {}  

  25. listfile = os.listdir("./")  

  26. for l in listfile:  

  27. prefixname, extname = os.path.splitext(l)  

  28. if extname == ".py":  

  29. action = __import__(prefixname)  

  30. mtime = os.path.getmtime(l)  

  31. action_time[prefixname] = mtime  

  32. action_dic[prefixname] = action  

  33. static_file_dir = "static"  

  34. static_dir = "/%s/" % static_file_dir  

  35. cache_static_dir = "cache_%s" % static_file_dir  

  36. if not os.path.exists(cache_static_dir):  

  37. os.makedirs(cache_static_dir)  

  38. filedic = {"HTM":None,"HTML":None,"CSS":None,"JS":None,"TXT":None,"XML":None}  


  39. def getTraceStackMsg():  

  40. tb = sys.exc_info()[2]  

  41. msg = ''  

  42. for i in traceback.format_tb(tb):  

  43. msg += i  

  44. return msg  


  45. def md5sum(fobj):  

  46. m = md5.new()  

  47. while True:  

  48. d = fobj.read(65536)  

  49. if not d:  

  50. break  

  51. m.update(d)  

  52. return m.hexdigest()  


  53. class QuickHTTPRequest():  

  54. def __init__(self, data):  

  55. headend = data.find("\r\n\r\n")  

  56. rfile = ""  

  57. if headend > 0:  

  58. rfile = data[headend+4:]  

  59. headlist = data[0:headend].split("\r\n")  

  60. else:  

  61. headlist = data.split("\r\n")  

  62. self.rfile = StringIO(rfile)  

  63. first_line = headlist.pop(0)  

  64. self.command, self.path, self.http_version =  re.split('\s+', first_line)  

  65. indexlist = self.path.split('?')  

  66. self.baseuri = indexlist[0]  

  67. indexlist = self.baseuri.split('/')  

  68. while len(indexlist) != 0:  

  69. self.index = indexlist.pop()  

  70. if self.index == "":  

  71. continue  

  72. else:  

  73. self.action,self.method = os.path.splitext(self.index)  

  74. self.method = self.method.replace('.', '')  

  75. break  

  76. self.headers = {}  

  77. for item in headlist:  

  78. if item.strip() == "":  

  79. continue  

  80. segindex = item.find(":")  

  81. if segindex < 0:  

  82. continue  

  83. key = item[0:segindex].strip()  

  84. value = item[segindex+1:].strip()  

  85. self.headers[key] = value  

  86. c_low = self.command.lower()  

  87. self.getdic = None  

  88. self.form = None  

  89. self.postdic = None  

  90. if c_low  == "get" and "?" in self.path:  

  91. self.getdic = parse_qs(self.path.split("?").pop())  

  92. elif c_low == "post" and self.headers.get('Content-Type',"").find("boundary") > 0:  

  93. self.form = cgi.FieldStorage(fp=self.rfile,headers=None,  

  94. environ={'REQUEST_METHOD':self.command,'CONTENT_TYPE':self.headers['Content-Type'],})  

  95. if self.form == None:  

  96. self.form = {}  

  97. elif c_low == "post":  

  98. self.postdic = parse_qs(rfile)  


  99. def sendfilejob(request, data, epoll_fd, fd):  

  100. try:  

  101. base_filename = request.baseuri[request.baseuri.find(static_dir)+1:]  

  102. cache_filename = "./cache_"+base_filename  

  103. filename = "./"+base_filename  

  104. if not os.path.exists(filename):  

  105. res = "file not found"  

  106. data["writedata"] = "HTTP/1.1 200 OK\r\nContent-Length: %s\r\nConnection:keep-alive\r\n\r\n%s" % (len(res),res)  

  107. else:  

  108. lasttimestr = request.headers.get("If-Modified-Since", None)  

  109. filemd5 = os.path.getmtime(filename)  

  110. timestr = time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(filemd5))  

  111. curtime = time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(time.time()))  

  112. sock = data["connections"]  

  113. if lasttimestr == timestr and "Range" not in request.headers:  

  114. data["writedata"] = "HTTP/1.1 304 Not Modified\r\nLast-Modified: %s\r\nETag: \"%s\"\r\nDate: %s\r\nConnection:keep-alive\r\n\r\n" % (timestr,filemd5,curtime)  

  115. else:  

  116. ext = request.method  

  117. iszip = False  

  118. Accept_Encoding = request.headers.get("Accept-Encoding", "")  

  119. if ext.upper() in filedic or (ext == "" and "gzip" in Accept_Encoding):  

  120. if not os.path.exists(cache_filename) or \  

  121. os.path.getmtime(cache_filename) < float(filemd5):  

  122. d,f = os.path.split(cache_filename)  

  123. try:  

  124. if not os.path.exists(d):  

  125. os.makedirs(d)  

  126. f_out = gzip.open(cache_filename, 'wb')  

  127. f_out.write(open(filename).read())  

  128. f_out.close()  

  129. except Exception, e:  

  130. print str(e)  

  131. pass  

  132. filename = cache_filename  

  133. iszip = True  


  134. filesize = os.path.getsize(filename)  

  135. if "Range" in request.headers:  

  136. range_value = request.headers["Range"].strip(' \r\n')  

  137. range_value = range_value.replace("bytes=", "")  

  138. start,end = range_value.split('-')  

  139. if end == '':  

  140. end = filesize - 1  

  141. start = int(start)  

  142. end = int(end)  

  143. headstr = "HTTP/1.1 206 Partial Content\r\nLast-Modified: %s\r\nETag: \"%s\"\r\nDate: %s\r\n" % (timestr,filemd5,curtime)  

  144. headstr += "Accept-Ranges: bytes\r\nContent-Range: bytes %s-%s/%s\r\n" % (start,end,filesize)  

  145. else:  

  146. start = 0  

  147. end = filesize - 1  

  148. headstr = "HTTP/1.1 200 OK\r\nLast-Modified: %s\r\nETag: \"%s\"\r\nDate: %s\r\n" % (timestr,filemd5,curtime)  

  149. offset = start  

  150. totalsenlen = end - start + 1  

  151. if totalsenlen < 0:  

  152. totalsenlen = 0  

  153. if iszip:  

  154. headstr += "Content-Encoding: gzip\r\n"  

  155. headstr += "Content-Length: %s\r\nConnection:keep-alive\r\n" % totalsenlen  

  156. headstr += "\r\n"  

  157. f = open(filename)  

  158. f.seek(offset)  

  159. readlen = 102400  

  160. if readlen > totalsenlen:  

  161. readlen = totalsenlen  

  162. firstdata = f.read(readlen)  

  163. headstr += firstdata  

  164. totalsenlen -= len(firstdata)  

  165. data["f"] = f  

  166. data["totalsenlen"] = totalsenlen  

  167. data["writedata"] = headstr  

  168. except Exception, e:  

  169. print str(e)+getTraceStackMsg()  

  170. data["writedata"] = "file not found"  

  171. pass  

  172. try:  

  173. data["readdata"] = ""  

  174. epoll_fd.modify(fd, select.EPOLLET | select.EPOLLOUT | select.EPOLLERR | select.EPOLLHUP)  

  175. except Exception, e:  

  176. #print str(e)+getTraceStackMsg()  

  177. pass  


  178. class Worker(object):  


  179. def __init__(self):  

  180. pass  


  181. def process(self, data, epoll_fd, fd):  

  182. res = ""  

  183. add_head = ""  

  184. try:  

  185. request = QuickHTTPRequest(data["readdata"])  

  186. except Exception, e:  

  187. res = "http format error"  

  188. try:  

  189. headers = {}  

  190. headers["Content-Type"] = "text/html;charset=utf-8"  

  191. headers["Connection"] = "keep-alive"  

  192. if request.path == "/favicon.ico":  

  193. request.path = "/"+static_file_dir+request.path  

  194. if static_dir in request.path or "favicon.ico" in request.path:  

  195. sendfilejob(request,data,epoll_fd,fd)  

  196. return None  

  197. action = action_dic.get(request.action, None)  

  198. if action == None:  

  199. action = __import__(request.action)  

  200. mtime = os.path.getmtime("./%s.py" % request.action)  

  201. action_time[request.action] = mtime  

  202. action_dic[request.action] = action  

  203. else:  

  204. load_time = action_time[request.action]  

  205. mtime = os.path.getmtime("./%s.py" % request.action)  

  206. if mtime>load_time:  

  207. action = reload(sys.modules[request.action])  

  208. action_time[request.action] = mtime  

  209. action_dic[request.action] = action  


  210. method = getattr(action, request.method)  

  211. res = method(request, headers)  

  212. if headers.get("Content-Encoding","") == "gzip":  

  213. buf = StringIO()  

  214. f = gzip.GzipFile(mode='wb', fileobj=buf)  

  215. f.write(res)  

  216. f.close()  

  217. res = buf.getvalue()  

  218. except Exception, e:  

  219. logger.error(str(e)+getTraceStackMsg())  

  220. res = "page no found"  

  221. try:  

  222. if headers.get("Connection","") != "close":  

  223. data["keepalive"] = True  

  224. res_len = len(res)  

  225. headers["Content-Length"] = res_len  

  226. for key in headers:  

  227. add_head += "%s: %s\r\n" % (key, headers[key])  

  228. data["writedata"] = "HTTP/1.1 200 OK\r\n%s\r\n%s" % (add_head, res)  

  229. data["readdata"] = ""  

  230. epoll_fd.modify(fd, select.EPOLLET | select.EPOLLOUT | select.EPOLLERR | select.EPOLLHUP)  

  231. except Exception, e:  

  232. print str(e)+getTraceStackMsg()  


  233. def InitLog():  

  234. logger.setLevel(logging.DEBUG)  

  235. fh = logging.FileHandler("network-server.log")  

  236. fh.setLevel(logging.DEBUG)  

  237. ch = logging.StreamHandler()  

  238. ch.setLevel(logging.ERROR)  

  239. formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")  

  240. ch.setFormatter(formatter)  

  241. fh.setFormatter(formatter)  

  242. logger.addHandler(fh)  

  243. logger.addHandler(ch)  


  244. class MyThread(threading.Thread):  

  245. ind = 0  

  246. def __init__(self, threadCondition, shareObject, **kwargs):  

  247. threading.Thread.__init__(self, kwargs=kwargs)  

  248. self.threadCondition = threadCondition  

  249. self.shareObject = shareObject  

  250. self.setDaemon(True)  

  251. self.worker = Worker()  


  252. def processer(self, args, kwargs):  

  253. try:  

  254. param = args[0]  

  255. epoll_fd = args[1]  

  256. fd = args[2]  

  257. self.worker.process(param, epoll_fd, fd)  

  258. except:  

  259. print  "job error:" + getTraceStackMsg()  


  260. def run(self):  

  261. while True:  

  262. try:  

  263. args, kwargs = self.shareObject.get()  

  264. self.processer(args, kwargs)  

  265. except Queue.Empty:  

  266. continue  

  267. except :  

  268. print "thread error:" + getTraceStackMsg()  


  269. class ThreadPool:  

  270. def __init__( self, num_of_threads=10):  

  271. self.threadCondition=threading.Condition()  

  272. self.shareObject=Queue.Queue()  

  273. self.threads = []  

  274. self.__createThreadPool( num_of_threads )  


  275. def __createThreadPool( self, num_of_threads ):  

  276. for i in range( num_of_threads ):  

  277. thread = MyThread( self.threadCondition, self.shareObject)  

  278. self.threads.append(thread)  


  279. def start(self):  

  280. for thread in self.threads:  

  281. thread.start()  


  282. def add_job( self, *args, **kwargs ):  

  283. self.shareObject.put( (args,kwargs) )  


  284. def run_main(listen_fd):  

  285. try:  

  286. epoll_fd = select.epoll()  

  287. epoll_fd.register(listen_fd.fileno(), select.EPOLLIN | select.EPOLLET | select.EPOLLERR | select.EPOLLHUP)  

  288. except select.error, msg:  

  289. logger.error(msg)  


  290. tp = ThreadPool(20)  

  291. tp.start()  


  292. params = {}  

  293. def clearfd(fd):  

  294. try:  

  295. param = params[fd]  

  296. epoll_fd.unregister(fd)  

  297. param["connections"].close()  

  298. f = param.get("f", None)  

  299. if f != None:  

  300. f.close()  

  301. except Exception, e:  

  302. pass  

  303. try:  

  304. del params[fd]  

  305. except Exception, e:  

  306. pass  


  307. last_min_time = -1  

  308. while True:  

  309. epoll_list = epoll_fd.poll()  


  310. for fd, events in epoll_list:  

  311. cur_time = time.time()  

  312. if fd == listen_fd.fileno():  


  313. while True:  

  314. try:  

  315. conn, addr = listen_fd.accept()  

  316. conn.setblocking(0)  

  317. epoll_fd.register(conn.fileno(), select.EPOLLIN | select.EPOLLET | select.EPOLLERR | select.EPOLLHUP)  

  318. conn.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)  

  319. #conn.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, True)  

  320. params[conn.fileno()] = {"addr":addr,"writelen":0, "connections":conn, "time":cur_time}  

  321. except socket.error, msg:  

  322. break  

  323. elif select.EPOLLIN & events:  

  324. param = params.get(fd,None)  

  325. if param == None:  

  326. continue  

  327. param["time"] = cur_time  

  328. datas = param.get("readdata","")  

  329. cur_sock = params[fd]["connections"]  

  330. while True:  

  331. try:  

  332. data = cur_sock.recv(102400)  

  333. if not data:  

  334. clearfd(fd)  

  335. break  

  336. else:  

  337. datas += data  

  338. except socket.error, msg:  

  339. if msg.errno == errno.EAGAIN:  

  340. param["readdata"] = datas  

  341. len_s = -1  

  342. len_e = -1  

  343. contentlen = -1  

  344. headlen = -1  

  345. len_s = datas.find("Content-Length:")  

  346. if len_s > 0:  

  347. len_e = datas.find("\r\n", len_s)  

  348. if len_s > 0 and len_e > 0 and len_e > len_s+15:  

  349. len_str = datas[len_s+15:len_e].strip()  

  350. if len_str.isdigit():  

  351. contentlen = int(datas[len_s+15:len_e].strip())  

  352. headend = datas.find("\r\n\r\n")  

  353. if headend > 0:  

  354. headlen = headend + 4  

  355. data_len = len(datas)  

  356. if (contentlen > 0 and headlen > 0 and (contentlen + headlen) == data_len) or \  

  357. (contentlen == -1 and headlen == data_len):  

  358. tp.add_job(param,epoll_fd,fd)  

  359. break  

  360. else:  

  361. clearfd(fd)  

  362. break  

  363. elif select.EPOLLHUP & events or select.EPOLLERR & events:  

  364. clearfd(fd)  

  365. logger.error("sock: %s error" % fd)  

  366. elif select.EPOLLOUT & events:  

  367. param = params.get(fd,None)  

  368. if param == None:  

  369. continue  

  370. param["time"] = cur_time  

  371. sendLen = param.get("writelen",0)  

  372. writedata = param.get("writedata", "")  

  373. total_write_len = len(writedata)  

  374. cur_sock = param["connections"]  

  375. f = param.get("f", None)  

  376. totalsenlen = param.get("totalsenlen", None)  

  377. if writedata == "":  

  378. clearfd(fd)  

  379. continue  

  380. while True:  

  381. try:  

  382. sendLen += cur_sock.send(writedata[sendLen:])  

  383. if sendLen == total_write_len:  

  384. if f != None and totalsenlen != None:  

  385. readmorelen = 102400  

  386. if readmorelen > totalsenlen:  

  387. readmorelen = totalsenlen  

  388. morefiledata = ""  

  389. if readmorelen > 0:  

  390. morefiledata = f.read(readmorelen)  

  391. if morefiledata != "":  

  392. writedata = morefiledata  

  393. sendLen = 0  

  394. total_write_len = len(writedata)  

  395. totalsenlen -= total_write_len  

  396. param["writedata"] = writedata  

  397. param["totalsenlen"] = totalsenlen  

  398. continue  

  399. else:  

  400. f.close()  

  401. del param["f"]  

  402. del param["totalsenlen"]  

  403. if param.get("keepalive", True):  

  404. param["readdata"] = ""  

  405. param["writedata"] = ""  

  406. param["writelen"] = 0  

  407. epoll_fd.modify(fd, select.EPOLLET | select.EPOLLIN | select.EPOLLERR | select.EPOLLHUP)  

  408. else:  

  409. clearfd(fd)  

  410. break  

  411. except socket.error, msg:  

  412. if msg.errno == errno.EAGAIN:  

  413. param["writelen"] = sendLen  

  414. break  

  415. clearfd(fd)  

  416. else:  

  417. continue  

  418. #check time out  

  419. if cur_time - last_min_time > 10:  

  420. last_min_time = cur_time  

  421. objs = params.items()  

  422. for (key_fd,value) in objs:  

  423. fd_time = value.get("time", 0)  

  424. del_time = cur_time - fd_time  

  425. if del_time > 10:  

  426. clearfd(key_fd)  

  427. elif fd_time < last_min_time:  

  428. last_min_time = fd_time  


  429. if __name__ == "__main__":  

  430. reload(sys)  

  431. sys.setdefaultencoding('utf8')  

  432. InitLog()  

  433. port = int(sys.argv[1])  

  434. try:  

  435. listen_fd = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)  

  436. except socket.error, msg:  

  437. logger.error("create socket failed")  

  438. try:  

  439. listen_fd.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)  

  440. except socket.error, msg:  

  441. logger.error("setsocketopt SO_REUSEADDR failed")  

  442. try:  

  443. listen_fd.bind(('', port))  

  444. except socket.error, msg:  

  445. logger.error("bind failed")  

  446. try:  

  447. listen_fd.listen(1024)  

  448. listen_fd.setblocking(0)  

  449. except socket.error, msg:  

  450. logger.error(msg)  


  451. child_num = 8  

  452. c = 0  

  453. while c < child_num:  

  454. c = c + 1  

  455. newpid = os.fork()  

  456. if newpid == 0:  

  457. run_main(listen_fd)  

  458. run_main(listen_fd)  

来自:http://blog.csdn.net/xiaobaihe0825/article/details/26295207

    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约