目录 一、客户端实现 二、单进程服务器 2.1 单进程实现 2.2 单进程非阻塞实现 2.3 TCP服务器(select版) 2.4 epoll版服务器实现 三、多进程服务器和多线程服务器 四、协程 4.1 协程的生成器实现 4.2 协程的greenlet实现 4.3 协程的gevent实现 4.3.1 gevent的使用 4.3.2 gevent的切换执行 4.3.3 gevent的服务器实现 一、客户端实现 客户端比较简单,并且适用于与不同服务器通信,代码如下: #coding=utf-8 from socket import * import random import time serverIp = raw_input('请输⼊服务器的ip:') connNum = raw_input('请输⼊要链接服务器的次数(例如1000):') g_socketList = [] for i in range(int(connNum)): s = socket(AF_INET, SOCK_STREAM) s.connect((serverIp, 7788)) g_socketList.append(s) print(i) while True: for s in g_socketList: s.send(str(random.randint(0,100))) # ⽤来测试⽤ #time.sleep(1)
二、单进程服务器 2.1 单进程实现 Linux服务器开发学习视频资料,包括Linux,Nginx,ZeroMQ,MySQL,Redis,fastdfs,MongoDB,ZK,流媒体,CDN,P2P,K8S,Docker,TCP/IP,协程,DPDK等等,需要知识技术学习视频文档资料的朋友可以后台私信【架构】获取
单进程完成一个tcp服务器,同时只能为一个客户端服务。 from socket import * serSocket = socket(AF_INET, SOCK_STREAM) # 重复使⽤绑定的信息,若服务器先close,则不用等待2MSL,可以直接绑定下一个客户端 serSocket.setsockopt(SOL_SOCKET, SO_REUSEADDR , 1) localAddr = ('', 7788) serSocket.bind(localAddr) serSocket.listen(5) while True: print('-----主进程, , 等待新客户端的到来------') newSocket,destAddr = serSocket.accept() print('-----主进程, , 接下来负责数据处理[%s]-----'%str(destAddr)) try: while True: recvData = newSocket.recv(1024) if len(recvData)>0: print('recv[%s]:%s'%(str(destAddr), recvData)) else: print('[%s]客户端已经关闭'%str(destAddr)) break finally: newSocket.close() serSocket.close()
2.2 单进程非阻塞实现 上面单进程实现同时只能为一个服务端服务,如果第二个while中不阻塞,则可以实现多用户同时服务。代码如下: #coding=utf-8 from socket import * import time # ⽤来存储所有的新链接的socket g_socketList = [] def main(): serSocket = socket(AF_INET, SOCK_STREAM) serSocket.setsockopt(SOL_SOCKET, SO_REUSEADDR , 1) localAddr = ('', 7788) serSocket.bind(localAddr) #可以适当修改listen中的值来看看不同的现象 serSocket.listen(1000) #将套接字设置为⾮堵塞 #设置为⾮堵塞后, 如果accept时, 恰巧没有客户端connect, 那么accept会 #产⽣⼀个异常, 所以需要try来进⾏处理 serSocket.setblocking(False) while True: #⽤来测试 #time.sleep(0.5) try: newClientInfo = serSocket.accept() except Exception as result: pass else: print('⼀个新的客户端到来:%s'%str(newClientInfo)) newClientInfo[0].setblocking(False) g_socketList.append(newClientInfo) # ⽤来存储需要删除的客户端信息 needDelClientInfoList = [] # 为列表中每个客户端服务 for clientSocket,clientAddr in g_socketList: try: recvData = clientSocket.recv(1024) if len(recvData)>0: print('recv[%s]:%s'%(str(clientAddr), recvData)) else: print('[%s]客户端已经关闭'%str(clientAddr)) clientSocket.close() g_needDelClientInfoList.append((clientSocket,clientAddr)) except Exception as result: pass for needDelClientInfo in needDelClientInfoList: g_socketList.remove(needDelClientInfo) if __name__ == '__main__': main()
2.3 TCP服务器(select版) tcp/ip学习资料获取后台私信【tcp/ip】
在非阻塞版本中使用for循环为列表中的每个客户端服务,而select版是通过调用select函数直接返回列表中接收到数据的socket,不必循环遍历。 优点:几乎所有平台都支持,有良好的跨平台性。 缺点:select的⼀个缺点在于单个进程能够监视的⽂件描述符的数量存在最⼤限制,在Linux上⼀般为1024, 可以通过修改宏定义甚⾄重新编译内核的⽅式提升这⼀限制, 但是这样也会造成效率的降低。 ⼀般来说这个数⽬ 和系统内存关系很⼤, 具体数⽬ 可以cat /proc/sys/fs/filemax查看。 32位机默认是1024个。 64位机默认是2048.个。对socket进⾏扫描时是依次扫描的, 即采⽤轮询的⽅法, 效率较低。 当套接字⽐较多的时候, 每次select()都要通过遍历FD_SETSIZE个Socket来完成调度, 不管哪个Socket是活跃的, 都遍历⼀遍。 这会浪费很多CPU时间。 select函数解释如图:
select版tcp服务器代码如下: import select import socket import sys server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server.bind(('', 7788)) server.listen(5) inputs = [server, sys.stdin] running = True while True: # 调用 select 函数,阻塞等待 readable, writeable, exceptional = select.select(inputs, [], []) # 数据抵达,循环 for sock in readable: # 监听到有新的连接 if sock == server: conn, addr = server.accept() # select 监听的socket inputs.append(conn) # 监听到键盘有输入 elif sock == sys.stdin: cmd = sys.stdin.readline() running = False break # 有数据到达 else: # 读取客户端连接发送的数据 data = sock.recv(1024) if data: sock.send(data) else: # 移除select监听的socket inputs.remove(sock) sock.close() # 如果检测到用户输入敲击键盘,那么就退出 if not running: break server.close()
2.4 epoll版服务器实现 为了解决select版并发连接数目的限制,出现了poll版,与select版几乎相同,唯一不同的是数量不受限制,仍是用的轮询方式。后来为了解决poll版轮询监测方式低下的问题出现了epoll版,epoll版相当于“有问题举手”,而不是“挨个问是否有问题”。 epoll版的优点: 1. 没有最⼤并发连接的限制, 能打开的FD(指的是⽂件描述符), 通俗的理解就是套接字对应的数字编号)的上限远⼤于1024 2. 效率提升, 不是轮询的⽅式, 不会随着FD数⽬的增加效率下降。 只有活跃可⽤的FD才会调⽤callback函数; 即epoll最⼤的优点就在于它只管你“活跃”的连接, ⽽跟连接总数⽆关, 因此在实际的⽹络环境中, epoll的效率就会远远⾼于select和poll。 epoll版tcp服务器代码如下: 代码解释: epoll的三种事件: EPOLLIN (可读) EPOLLOUT (可写) EPOLLET (ET模式) epoll对⽂件描述符的操作有两种模式: LT(level trigger 水平触发) 和ET(edge trigger 边沿触发) 。 LT模式是默认模式, LT模式与ET模式的区别如下: LT模式: 当epoll检测到描述符事件发⽣并将此事件通知应⽤程序, 应⽤程序可以不⽴即处理该事件 ET模式: 当epoll检测到描述符事件发⽣并将此事件通知应⽤程序, 应⽤程序必须⽴即处理该事件,否则会丢失 import socket import select # 创建套接字 s = socket.socket(socket.AF_INET,socket.SOCK_STREAM) # 设置可以重复使⽤绑定的信息 s.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1) # 绑定本机信息 s.bind(('',7788)) # 变为被动 s.listen(10) # 创建⼀个epoll对象 epoll=select.epoll() # 测试, ⽤来打印套接字对应的⽂件描述符 # print s.fileno() # print select.EPOLLIN|select.EPOLLET # 注册事件到epoll中 # epoll.register(fd, [eventmask]) # 注意, 如果fd已经注册过, 则会发⽣异常 # 将创建的socket添加到epoll的事件监听中 # [eventmask]为监听的事件列表,有列表中的事件时才会放入epoll列表, # 事件有三种,EPOLLIN接收数据事件,EPOLLOUT发送数据,EPOLLET模式(水平触发或边沿触发) epoll.register(s.fileno(),select.EPOLLIN|select.EPOLLET) # connections用于存储socket,addresses用于存储端口, # 它们都为字典,key为socket的文件描述符,value为socket或端口! connections = {} addresses = {} # 循环等待客户端的到来或者对⽅发送数据 while True: # epoll 进⾏ fd 扫描的地⽅ -- 未指定超时时间则为阻塞等待 # 等价于select版本中的 readable,xxx,yyy = select([],[],[]) # 不为轮询,使用的是事件通知机制,为本代码的核心 epoll_list=epoll.poll() # 对事件进⾏判断 for fd,events in epoll_list: # print fd # print events # 如果是socket创建的套接字被激活 if fd == s.fileno(): conn,addr=s.accept() print('有新的客户端到来%s'%str(addr)) # 将 conn 和 addr 信息分别保存起来 # 注意connections和address为字典,以key、value存储 connections[conn.fileno()] = conn addresses[conn.fileno()] = addr # 向 epoll 中注册 连接 socket 的 可读 事件 epoll.register(conn.fileno(), select.EPOLLIN | select.EPOLLET) elif events == select.EPOLLIN: # 从激活 fd 上接收 recvData = connections[fd].recv(1024) if len(recvData)>0: print('recv:%s'%recvData) else: # 从 epoll 中移除该 连接 fd epoll.unregister(fd) # server 则主动关闭该 连接 fd connections[fd].close() print('%s---offline---'%str(addresses[fd]))
三、多进程服务器和多线程服务器 代码说明: 1、多进程实现和多线程实现几乎相同,不同点:1、创建的时候;2、while中多进程实现中,由于子进程复制了一份,所以可以关闭,多线程中,子线程之间共享资源,所以在while中不能关闭。 2、代码中使用try...finally,目的是可以使用Ctrl+C强制结束进程或线程。 多进程服务器代码如下: #coding=utf-8 from socket import * from multiprocessing import * from time import sleep # 处理客户端的请求并为其服务 def dealWithClient(newSocket,destAddr): while True: recvData = newSocket.recv(1024) if len(recvData)>0: print('recv[%s]:%s'%(str(destAddr), recvData)) else: print('[%s]客户端已经关闭'%str(destAddr)) break newSocket.close() def main(): serSocket = socket(AF_INET, SOCK_STREAM) serSocket.setsockopt(SOL_SOCKET, SO_REUSEADDR , 1) localAddr = ('', 7788) serSocket.bind(localAddr) serSocket.listen(5) try: while True: print('-----主进程,,等待新客户端的到来------') newSocket,destAddr = serSocket.accept() print('-----主进程,,接下来创建⼀个新的进程负责数据处理------') client = Process(target=dealWithClient, args=(newSocket,destAddr)) client.start() #因为已经向⼦进程中copy了⼀份(引⽤),并且⽗进程中这个套接字所以关闭 newSocket.close() finally: #当为所有的客户端服务完之后再进⾏关闭,表示不再接收新的客户端的链接 serSocket.close() if __name__ == '__main__': main()
多线程服务器代码如下: #coding=utf-8 from socket import * from threading import Thread from time import sleep # 处理客户端的请求并执⾏事情 def dealWithClient(newSocket,destAddr): while True: recvData = newSocket.recv(1024) if len(recvData)>0: print('recv[%s]:%s'%(str(destAddr), recvData)) else: print('[%s]客户端已经关闭'%str(destAddr)) break newSocket.close() def main(): serSocket = socket(AF_INET, SOCK_STREAM) serSocket.setsockopt(SOL_SOCKET, SO_REUSEADDR , 1) localAddr = ('', 7788) serSocket.bind(localAddr) serSocket.listen(5) try: while True: print('-----主进程, , 等待新客户端的到来------') newSocket,destAddr = serSocket.accept() print('-----主进程, , 接下来创建⼀个新的进程负责数据处理[%s]-----') client = Thread(target=dealWithClient, args=(newSocket,destAddr)) client.start() #因为线程中共享这个套接字, 如果关闭了会导致这个套接字不可⽤, #但是此时在线程中这个套接字可能还在收数据, 因此不能关闭 #newSocket.close() finally: serSocket.close() if __name__ == '__main__': main()
四、协程 协程学习资后台私信【协程】获取文档代码
进程里面有线程,线程里面有协程。协程不牵扯到切换,并且能完成多任务。 注意:计算密集型时用多进程;IO密集型使用多线程、多协程。 通俗的理解: 在⼀个线程中的某个函数, 可以在任何地⽅保存当前函数的⼀些临时变量等信息, 然后切换到另外⼀个函数中执⾏, 注意不是通过调⽤函数的⽅式做到的, 并且切换的次数以及什么时候再切换到原来的函数都由开发者⾃⼰确定。 协程和线程的区别:线程切换从系统层⾯远不⽌保存和恢复 CPU上下⽂这么简单。 操作系统为了程序运营的⾼效性每个线程都有⾃⼰缓存Cache等等数据, 操作系统还会帮你做这些数据的恢复操作。所以线程的切换⾮常耗性能。 但是协程的切换只是单纯的操作CPU的上下⽂, 所以⼀秒钟切换个上百万次系统都抗的住。 协程调度:操作系统不感知协程,所以操作系统不会对协程调度。 ⽬前的协程框架⼀般都是设计成 1:N 模式。 所谓 1:N 就是⼀个线程作为⼀个容器⾥⾯放置多个协程。 那么谁来适时的切换这些协程? 答案是有协程自己主动让出CPU, 也就是每个协程池⾥⾯有⼀个调度器, 这个调度器是被动调度的。 意思就是他不会主动调度。 ⽽且当⼀个协程发现自己执行不下去了(比如异步等待⽹络的数据回来, 但是当前还没有数据到), 这个时候就可以由这个协程通知调度器, 这个时候执⾏到调度器的代码, 调度器根据事先设计好的调度算法找到当前最需要CPU的协程。 切换这个协程的CPU上下⽂把CPU的运⾏权交给这个协程, 直到这个协程出现执行不下去需要等等的情况, 或 者它调⽤主动让出CPU的API之类, 触发下⼀次调度。 协程调度存在问题:假设一个线程中有⼀个协程是CPU密集型的他没有IO操作, 也就是自己不会主动触发调度器调度的过程, 那么就会出现其他协程得不到执⾏的情况, 所以这种情况下需要程序员⾃ ⼰避免。 这是⼀个问题, 假设业务开发的⼈员并不懂这个原理的话就可能会出现问题。 协程的优点:在IO密集型的程序中由于IO操作远远慢于CPU的操作, 所以往往需要CPU去等IO操作。 同步IO下系统需要切换线程, 让操作系统可以在IO过程中执⾏其他的东⻄。 这样虽然代码是符合⼈类的思维习惯但是由于⼤量的线程切换带来了⼤量的性能的浪费, 尤其是IO密集型的程序。 所以⼈们发明了异步IO。 就是当数据到达的时候触发我的回调。 来减少线程切换带来性能损失。 但是这样的坏处也是很⼤的, 主要的坏处就是操作被“分片” 了, 代码写的不是 “一气呵成” 这种。 而是每次来段数据就要判断 数据够不够处理, 够处理就处理, 不够处理就再等等。 这样代码的可读性很低, 其实也不符合⼈类的习惯。 但是协程可以很好解决这个问题。 比如 把⼀个IO操作 写成⼀个协程。 当触发IO操作的时候就自动让出CPU给其他协程。 要知道协程的切换很轻的。 协程通过这种对异步IO的封装 既保留了性能也保证了代码的容易编写和可读性。在⾼IO密集型的程序下很好。 但是高CPU密集型的程序下没啥好处。 4.1 协程的生成器实现 协程使用生成器来实现的,代码如下(只切换了函数调用,所以效率比较高): import time def A(): while True: print('----A---') yield time.sleep(0.5) def B(c): while True: print('----B---') c.next() time.sleep(0.5) if __name__=='__main__': a = A() B(a)
结果如下: --B-- --A-- --B-- --A-- --B-- --A-- --B-- --A-- ...省略...
4.2 协程的greenlet实现 与生成器实现类似。 注意:进程、线程的调用是操作系统决定的,执行顺序不可预测,而协程是程序员决定的执行顺序可预测,这由以下代码可知(当执行到xx.switch()时会切换)。 使用下面命令安装greenlet: sudo pip install greenlet #python2的安装方式 sudo pip3 install greenlet #python3的安装方式
协程的greenlet实现代码如下: #coding=utf-8 from greenlet import greenlet import time def test1(): while True: print '---A--' gr2.switch() # 切换到gr2(即test2)中执行,test2执行切换时会从当前接着执行 time.sleep(0.5) def test2(): while True: print '---B--' gr1.switch() # 切换到gr1(即test1)中执行,test1执行切换时会从当前接着执行 time.sleep(0.5) gr1 = greenlet(test1) gr2 = greenlet(test2) #切换到gr1(即test1函数)中执行 gr1.switch()
结果如下: --A-- --B-- --A-- --B-- --A-- --B-- --A-- ...省略...
4.3 协程的gevent实现 gevent是对greenlet的再次封装,不用程序员自己编程切换,当遇到需要切换的地方会自动切换。 4.3.1 gevent的使用 #coding=utf-8 #请使⽤python 2 来执⾏此程序 import gevent def f(n): for i in range(n): print gevent.getcurrent(), i g1 = gevent.spawn(f, 5) # 绑定f函数,执行5次 g2 = gevent.spawn(f, 5) g3 = gevent.spawn(f, 5) # 清除协程 g1.join() g2.join() g3.join()
执行结果: 一瞬间执行完毕,g1、g2、g3依次顺序执行,并非交替执行,不是我们想要的结果。 <Greenlet at 0x10e49f550: f(5)> 0 <Greenlet at 0x10e49f550: f(5)> 1 <Greenlet at 0x10e49f550: f(5)> 2 <Greenlet at 0x10e49f550: f(5)> 3 <Greenlet at 0x10e49f550: f(5)> 4 <Greenlet at 0x10e49f910: f(5)> 0 <Greenlet at 0x10e49f910: f(5)> 1 <Greenlet at 0x10e49f910: f(5)> 2 <Greenlet at 0x10e49f910: f(5)> 3 <Greenlet at 0x10e49f910: f(5)> 4 <Greenlet at 0x10e49f4b0: f(5)> 0 <Greenlet at 0x10e49f4b0: f(5)> 1 <Greenlet at 0x10e49f4b0: f(5)> 2 <Greenlet at 0x10e49f4b0: f(5)> 3 <Greenlet at 0x10e49f4b0: f(5)> 4
4.3.2 gevent的切换执行 上面顺序执行的原因是在f函数中没有调用延时,所以不会切换。gevent当遇到耗时操作时才会切换,所以增加一个延时函数使它能够切换,代码如下: import gevent def f(n): for i in range(n): print gevent.getcurrent(), i #⽤来模拟⼀个耗时操作, 注意不是time模块中的sleep gevent.sleep(1) g1 = gevent.spawn(f, 5) g2 = gevent.spawn(f, 5) g3 = gevent.spawn(f, 5) g1.join() g2.join() g3.join()
结果如下: <Greenlet at 0x7fa70ffa1c30: f(5)> 0 <Greenlet at 0x7fa70ffa1870: f(5)> 0 <Greenlet at 0x7fa70ffa1eb0: f(5)> 0 <Greenlet at 0x7fa70ffa1c30: f(5)> 1 <Greenlet at 0x7fa70ffa1870: f(5)> 1 <Greenlet at 0x7fa70ffa1eb0: f(5)> 1 <Greenlet at 0x7fa70ffa1c30: f(5)> 2 <Greenlet at 0x7fa70ffa1870: f(5)> 2 <Greenlet at 0x7fa70ffa1eb0: f(5)> 2 <Greenlet at 0x7fa70ffa1c30: f(5)> 3 <Greenlet at 0x7fa70ffa1870: f(5)> 3 <Greenlet at 0x7fa70ffa1eb0: f(5)> 3 <Greenlet at 0x7fa70ffa1c30: f(5)> 4 <Greenlet at 0x7fa70ffa1870: f(5)> 4 <Greenlet at 0x7fa70ffa1eb0: f(5)> 4
4.3.3 gevent的服务器实现 注意:要使用gevent实现服务器,不能使用默认的socket,而是使用gevent自己的socket,gevent将常用的耗时操作都重写了一遍,用于检测是否为耗时操作。 import sys import time import gevent from gevent import socket,monkey # 此语句会将本代码改写,位于编译器级的,具体不清楚!(python为动态语言在执行中可以修改) # 必须使用!!! monkey.patch_all() def handle_request(conn): while True: #--------------#1处#----------------- data = conn.recv(1024) # 这是gevent中的recv,为耗时操作,会切换到2处! if not data: conn.close() break print('recv:', data) conn.send(data) def server(port): s = socket.socket() s.bind(('', port)) s.listen(5) while True: #--------------#2处#----------------- cli, addr = s.accept() # 这是gevent中的accept,为耗时操作,会进行切换! # 注意:第一次到这里时只有一个协程,不需要切换,在此等待! # 不为第一次时切换到1处! gevent.spawn(handle_request, cli) if __name__ == '__main__': server(7788)
|