分享

聊一聊 Python 的 socket,以及 select、poll、epoll 又是怎么一回事?

 古明地觉O_o 2023-02-27 发布于北京

楔子

之前在介绍 TCP 协议的时候,提到了 Socket,它的中文翻译是套接字。我们说 Socket 是对 TCP/IP 协议的一个封装,可以让我们更方便地使用 TCP/IP 协议,而不用关注背后的原理。并且我们经常使用的 Web 框架,本质上也是一个 Socket。

那么本篇文章我们就以 Python 为例,好好地聊一聊 Socket,而且你也一定知道 IO 多路复用,比如 select, poll, epoll,但它们之间的区别和用法你是否了如指掌呢?下面就带着这些问题,开始本文的内容吧。


什么是 Socket

上面说了,Socket 是操作系统对 TCP/IP 网络协议栈的封装,并提供了一系列的接口,我们通过这些接口可以实现网络通信,而不用关注网络协议的具体细节。

按照现有的网络模型,Socket 并不属于其中的任何一层,但我们可以简单地将 Socket 理解为传输层之上的抽象层,负责连接应用层和传输层。Socket 提供了大量的 API,基于这些 API 我们可以非常方便地使用网络协议栈,在不同主机间进行网络通信。

Linux 一切皆文件,Socket 也不例外,它被称为套接字文件,在使用上和普通文件是类似的。


如何使用 Socket 编程

socket 是什么我们已经知道了,下面来看看如何使用 socket 进行编程。

  • 服务端初始化 socket,此时会得到「主动套接字」;

  • 服务端调用 bind,将套接字绑定在某个 IP 和端口上;

  • 服务端调用 listen 进行监听,此时「主动套接字」会变成「监听套接字」;

  • 服务端调用 accept,等待客户端连接,此时服务端会阻塞在这里(调用的是阻塞的 API);

  • 客户端同样初始化 socket,得到主动套接字;

  • 客户端调用主动套接字的 connect,向服务器端发起连接请求,如果连接成功,后续客户端就用这个主动套接字进行数据的传输;

  • 当客户端来连接时,那么服务端的 accept 将不再阻塞,并返回「已连接套接字」,后续服务端便用这个已连接套接字和客户端进行数据传输;

  • 如果客户端断开连接,那么服务端 read 读取数据的时候就会出现 EOF,知道客户端断开连接了。待数据处理完毕后,服务端也要调用 close 来关闭连接;

我们使用 Python 来演示一下这个过程,首先是服务端:

import socket

# socket.socket() 会返回一个「主动套接字」
server = socket.socket(
    # 表示使用 IPv4,如果是 socket.AF_INET6
    # 则表示使用 IPv6
    socket.AF_INET,
    # 表示建立 TCP 连接,如果是 socket.SOCK_DGRAM
    # 则表示建立 UDP 连接
    socket.SOCK_STREAM
)
# 当然这两个参数也可以不传,因为默认就是它

# 设置套接字属性,这里让端口释放后立刻就能再次使用
server.setsockopt(socket.SOL_SOCKET,
                  socket.SO_REUSEADDR, True)

# 将「主动套接字」绑定在某个 IP 和端口上
server.bind(("localhost"12345))
# 监听,此时「主动套接字」会变成「监听套接字」
# 里面的参数表示 backlog,代表的含义后面说
server.listen(5)

# 调用 accept,等待客户端连接,此时会阻塞在这里
# 如果客户端连接到来,那么会返回「已连接套接字」,也就是这里的 conn
# 至于 addr 则是一个元组,保存了客户端连接的信息(IP 和端口)
conn, addr = server.accept()

# 下面我们通过「已连接套接字」conn 和客户端进行消息的收发
# 收消息使用 recv、发消息使用 send,和 read、write 本质是一样的
while True:
    msg = conn.recv(1024)
    # 当客户端断开连接时,msg 会收到一个空字节串
    if not msg:
        print("客户端已经断开连接")
        conn.close()
        break
    print("客户端发来消息:", msg.decode("utf-8"))
    # 然后我们加点内容之后,再给客户端发过去
    conn.send("服务端收到, 你发的消息是: ".encode("utf-8") + msg)

接下来编写客户端:

import socket

# 返回主动套接字
client = socket.socket(socket.AF_INET,
                       socket.SOCK_STREAM)
# 连接服务端
client.connect(("localhost"12345))
while True:
    # 发送消息
    data = input("请输入内容: ")
    if data.strip().lower() in ("q""quit""exit"):
        client.close()
        print("Bye~~~")
        break
    client.send(data.encode("utf-8"))
    print(client.recv(1024).decode("utf-8"))

启动服务端和客户端进行测试:

还是比较简单的,当然我们这里的服务端每次只能和一个客户端通信,如果想服务多个客户端的话,那么需要为已连接套接字单独开一个线程和客户端进行通信,然后主线程继续执行 accept 等待下一个客户端。

下面来编写一下多线程的版本,这里只需要编写服务端即可,客户端代码不变。

import socket
import threading

server = socket.socket()
server.setsockopt(socket.SOL_SOCKET,
                  socket.SO_REUSEADDR, True)
server.bind(("localhost"12345))
server.listen(5)


def handle_message(conn, addr):
    while True:
        msg = conn.recv(1024)
        if not msg:
            print(f"客户端(ip: {addr[0]}, port: {addr[1]}) 已经断开连接")
            conn.close()
            break
        print(f"客户端(ip: {addr[0]}, port: {addr[1]}) 发来消息:",
              msg.decode("utf-8"))
        conn.send("服务端收到, 你发的消息是: ".encode("utf-8") + msg)


while True:
    conn, addr = server.accept()
    threading.Thread(
        target=handle_message,
        args=(conn, addr)
    ).start()

代码很简单,就是把已连接套接字和客户端的通信逻辑写在了单独的函数中,每来一个客户端,服务端都会启动一个新的线程去执行该函数,然后继续监听,等待下一个客户端连接到来。

然后客户端代码不变,我们启动三个客户端去和服务端通信,看看结果如何。

结果一切正常,当然我们这里的代码比较简单,就是普通的消息收发。你也可以实现一个更复杂的功能,比如文件下载器,把服务端当成网盘,支持客户端上传和下载文件,并不难。

socketserver


另外 Python 标准库还提供了一个模块叫 socketserver,它是 socket 的更高级封装,可以简化服务端的代码逻辑。并且 socketserver 的内部会自动使用多线程,服务多个客户端。

import socketserver

# 自定义一个类,必须继承 BaseRequestHandler
class ServiceHandler(socketserver.BaseRequestHandler):
    """
    内部提供了三个重要属性
        self.request: 已连接套接字 conn
        self.client_address: 客户端信息 addr
        self.server: 服务端实例(一会我们会创建它)

    然后我们必须要实现 handle 方法,处理客户端连接时会自动调用
    此外还有两个方法,分别是 setup 和 finish,实不实现均可
    """


    def setup(self) -> None:
        """在执行 handle 之前调用,用于提前做一些连接相关的设置"""

    def finish(self) -> None:
        """在执行 handle 之后调用,用于资源释放等等"""
        self.request.close()

    def handle(self) -> None:
        """
        处理客户端连接
        这里的 self.request 就相当于之前的 conn
        """

        client_ip, client_port = self.client_address
        while True:
            msg = self.request.recv(1024)
            if not msg:
                print(f"客户端(ip: {client_ip}, port: {client_port}) 已经断开连接")
                self.request.close()
                break
            print(f"客户端(ip: {client_ip}, port: {client_port}) 发来消息:",
                  msg.decode("utf-8"))
            self.request.send("服务端收到, 你发的消息是: ".encode("utf-8") + msg)


# 绑定 IP 和端口,以及用于处理的 Handler
# 这里的 ThreadingTCPServer 实例就是 ServiceHandler 里面的 self.server
server = socketserver.ThreadingTCPServer(
    ("localhost"12346),
    ServiceHandler
)
# 开启无限循环,监听连接
server.serve_forever()
# 如果关闭监听,那么调用 server.shutdown()

可以测试一下,结果没有问题。并且当前支持多个客户端连接,每来一个客户端就会实例化一个 ServiceHandler,并开启多线程执行 handle 方法,与客户端通信。

以上我们就简单提了一下 socketserver,了解一下即可。

listen 方法的意义?

在创建完 socket 之后,我们调用了 listent 方法,该方法接收一个 backlog 参数。

server = socket.socket()
...
server.listen(5)

那么该方法的意义是什么呢?我们调用时传的数字 5 又有什么作用呢?

根据上面的 socket 流程图,我们可以得知在三次握手的时候,Linux 内核会维护两个队列:

  • 半连接队列,也称 SYN 队列;

  • 全连接队列,也称 Accept 队列;

服务端收到客户端发起的 SYN 请求后,内核会把该连接存储到半连接队列,并向客户端响应 SYN+ACK。接着客户端会回复 ACK,服务端收到后,内核会从半连接队列里面将连接取出,然后添加到全连接队列,等待进程调用 accept 函数时把连接取出来。

所以整个过程如下:

  • 1. 客户端发送 SYN 报文;

  • 2. 服务端将连接插入到半连接队列;

  • 3. 服务端向客户端返回 SYN + ACK;

  • 4. 客户端收到之后再向服务端返回 ACK;

  • 5. 服务端将连接从半连接队列中取出,移入全连接队列;

  • 6. 进程调用 accept 函数,从全连接队列中取出已完成连接建立的 socket 连接;

因此半连接队列(SYN 队列)用来存储 SYN_RECV 状态、未完成建立的连接;全连接队列(Accept 队列)用来存储 ESTABLISH 状态、已完成建立的连接。

而我们也可以很容易得出结论,客户端返回成功是在第二次握手之后,服务端 accept 成功是在三次握手之后,因为调用 accept 就相当于从全连接队列中取出连接和客户端进行通信。

那么如何查看 SYN 队列和 Accept 队列的大小呢?

  • net.ipv4.tcp_max_syn_backlog:查看半连接队列长度;

  • net.core.somaxconn:查看全连接队列的长度;

Linux 一切皆文件,如果想要修改队列大小的话,直接修改相应的文件即可。当然准确来说:

  • max(64, tcp_max_syn_backlog) 才是半连接队列的长度;

  • min(backlog, somaxconn) 才是全连接队列的长度,这里的 backlog 就是我们编写 socket 代码时,在 listen 方法里面指定的值。我们之前指定了 5,那么全连接队列的长度就是 5;

但是在服务端并发处理大量请求时,如果 TCP Accpet 队列过小,或者应用程序调用 accept 方法不及时,就会造成 Accpet 队列已满。这时后续的连接就会被丢弃,从而导致服务端请求数量上不去。

以上就是 listen 方法存在的意义,它接收一个 backlog 参数。如果觉得服务端支持的并发量不够,那么可以增大 backlog 的值。


非阻塞 I/O

先回顾一下 socket 模型:

但是注意:我们说在 listen() 这一步,会将主动套接字转化为监听套接字,但此时的监听套接字的类型是阻塞的。阻塞类型的监听套接字在调用 accept() 方法时,如果没有客户端来连接的话,就会一直处于阻塞状态,那么此时主线程就没法干其它事情了。

所以要设置为非阻塞,而非阻塞的监听套接字在调用 accept() 时,如果没有客户端来连接,那么主线程不会傻傻地等待,而是会直接返回,然后去做其它的事情。

类似的,我们在创建已连接套接字的时候默认也是阻塞的,阻塞类型的已连接套接字在调用 send() 和 recv() 的时候也会处于阻塞状态。比如当客户端一直不发数据的时候,已连接套接字就会一直阻塞在 recv() 这一步。如果是非阻塞类型的已连接套接字,那么当调用 recv() 但却收不到数据时,也不用处于阻塞状态,同样可以直接返回去做其它事情。

import socket

server = socket.socket()
server.bind(("localhost"12345))
# 调用 setblocking 方法,传入 False
# 表示将监听套接字和已连接套接字的类型设置为非阻塞
server.setblocking(False)
server.listen(5)

while True:
    try:
        # 非阻塞的监听套接字调用 accept() 时
        # 如果发现没有客户端连接,则会立刻抛出 BlockingIOError
        # 因此这里写了个死循环
        conn, addr = server.accept()
    except BlockingIOError:
        pass
    else:
        break

while True:
    try:
        # 同理,非阻塞的已连接套接字在调用 recv() 时
        # 如果发现客户端没有发数据,那么同样会报错
        msg = conn.recv(1024)
    except BlockingIOError:
        pass
    else:
        print(msg.decode("utf-8"))
        conn.send(b"data from server")

很明显,虽然上面的代码在运行的时候正常,但存在两个问题:

1)虽然 accept() 不阻塞了,在没有客户端连接时主线程可以去做其它事情,但如果后续有客户端连接,主线程要如何得知呢?因此必须要有一种机制,能够继续在监听套接字上等待后续连接请求,并在请求到来时通知主线程。

我们上面的做法是写了一个死循环,但很明显这是没有意义的,这种做法还不如使用阻塞的套接字。

2)send() / recv() 不阻塞了,相当于 I/O 读写流程不再是阻塞的,读写方法都会瞬间完成并返回,也就是说它会采用能读多少就读多少、能写多少就写多少的策略来执行 I/O 操作,这显然更符合我们对性能的追求。

但显然对于非阻塞套接字而言,会面临一个问题,那就是当我们执行读取操作时,有可能只读了一部分数据,剩余的数据客户端还没发过来,那么这些数据何时可读呢?同理写数据也是这种情况,当缓冲区满了,而我们的数据还没有写完,那么剩下的数据又何时可写呢?因此同样要有一种机制,能够在主线程做别的事情的时候继续监听已连接套接字,并且在有数据可读写的时候通知主线程。

这样才能保证主线程既不会像基本 IO 模型一样,一直在阻塞点等待,也不会无法处理实际到达的客户端连接请求和可读写的数据,而上面所提到的机制便是 I/O 多路复用。


I/O 多路复用

I/O 多路复用机制是指一个线程处理多个 IO 流,也就是我们经常听到的 select/poll/epoll,而 Linux 默认采用的是 epoll。

简单来说,在只运行单线程的情况下,该机制允许内核中同时存在多个监听套接字已连接套接字(套接字必须是非阻塞的)。内核会一直监听这些套接字上的连接请求或数据请求,一旦有请求到达就会交给主线程处理,这样就实现了一个线程处理多个 IO 流的效果。

上图就是基于多路复用的 IO 模型,我们以 epoll 为例。图中的 FD 是套接字,可以是监听套接字、也可以是已连接套接字,程序会通过 epoll 机制来让内核帮忙监听这些套接字。而此时主线程不会阻塞在某一个特定的套接字上,也就是说不会阻塞在某一个特定的客户端请求处理上。因此基于 epoll,服务端可以同时和多个客户端建立连接并处理请求,从而提升并发性。

但为了在请求到达时能够通知主线程,epoll 提供了基于事件的回调机制,即针对不同事件的发生,调用相应的处理函数。

那回调机制是怎么工作的呢?以上图为例,首先 epoll 一旦监测到 FD 上有请求到达,就会触发相应的事件。这些事件会被放进一个队列中,主线程对该事件队列不断进行处理,这样一来就无需一直轮询是否有请求发生,从而避免资源的浪费。

而在对事件队列中的事件进行处理时,会调用相应的处理函数,这就实现了基于事件的回调。因为主线程一直在对事件队列进行处理,所以能及时响应客户端请求,提升服务的响应性能。

比如连接请求和数据读取请求分别对应 Accept 事件和 Recv 事件,主线程分别对这两个事件注册 accept 和 recv 回调函数。当 Linux 内核监听到有连接请求或数据读取请求时,就会触发 Accept 事件或 Recv 事件,然后通知主线程执行 accept 函数或 recv 函数。

不好理解的话,举个通俗易懂的例子。比如小明要去怡红院,去找小红、小花和小翠,于是他问老鸨,这些姑娘来了没有啊,老鸨说没有。过一会小明又来问,这些姑娘来了没有啊,老鸨说没有。然后小明又问,这个过程就是在不断地轮询。最后老鸨无奈了,问小明:你要找这些姑娘做什么,等她们来了我通知你。

在这个例子中,小明相当于主线程,小红、小花和小翠就相当于套接字,老鸨相当于 epoll,负责监听这些套接字,并且可以同时监听很多个。如果她们来怡红院了,就说明套接字有事件发生了,老鸨就会通知小明,谁谁谁已经来了,你赶快做你想做的事情吧(相当于执行事件处理函数)。

比如小红来了,送她一只口红;小花来了,送她一朵玫瑰;小翠来了,送她一条手链。针对不同的事件执行相应的处理函数,而整个过程小明不需要一直轮询,它完全可以去做别的事情,当套接字有事件发生时,epoll 会通知他。

所以通过将非阻塞 I/O 和 I/O 多路复用技术搭配使用,在非阻塞 I/O 事件发生时,调用对应事件的处理函数,这种方式极大地提高了程序的健壮性和稳定性,是 Linux 下高性能网络编程的首选。

然后我们就来看看如何在 Python 里面使用 IO 多路复用,而且 IO 多路复用有多种,最常见的就是 select、poll 和 epoll,而它们之间又有什么区别呢?下面来一点一点介绍。


多路复用之 select

Python 有一个 select 模块,它内部有一个 select 函数,对应 select IO 多路复用。进程指定内核监听哪些文件描述符,当没有文件描述符事件发生时,进程被阻塞;当一个或者多个文件描述符事件发生时,进程被唤醒。

然后我们来看一下 select 函数。

import select

"""
select 函数接收四个参数
    rlist:一个列表,监听那些可读的 socket
    wlist:一个列表,监听那些可写的 socket
    xlist:一个列表,监听那些出错的 socket
    timeout:超时时间
"""

select.select()

这里需要特别指出的是,Linux 一切接文件,套接字也不例外。每一个套接字(文件)都有一个文件描述符(非负整数),用来标识唯一的套接字。如果用 C 实现多路复用,那么会以文件描述符作为参数,有了文件描述符,函数就能找到对应的套接字,进而进行监听、读写等操作。

但在 Python 里面,则是直接使用套接字本身作为参数,而不是使用文件描述符。当然啦,Python 的 select 也是封装了底层的 select。

然后我们来看一下如何使用 select。

import socket
import select
from queue import Queue
from typing import Dict

server = socket.socket()
server.bind(("localhost"12345))
server.setsockopt(socket.SOL_SOCKET,
                  socket.SO_REUSEADDR, True)
# 必须设置为非阻塞,IO 多路复用要搭配非阻塞套接字
server.setblocking(False)
server.listen(5)

# 以上我们就创建了监听套接字,它负责监听是否有客户端连接
# 所以当事件发生时,属于可读事件,代表客户端连接过来了
# 所以 server 应该放在 rlist 里面
rlist = [server]
wlist = []
xlist = rlist
# 因为可以监听多个套接字,所以 rlist、wlist、xlist 都是列表
# 但在初始状态下,select 只需要监听一个套接字(server)即可

message_queues: Dict[socket.socket, Queue] = {}

while True:
    # 开启 select 监听,返回三个元素
    # readable:   rlist 中发生可读事件的 socket
    # writeable:  wlist 中发生可写事件的 socket
    # exceptional:xlist 中发生异常的 socket
    readable, writeable, exceptional = select.select(rlist, wlist, xlist)

    # 遍历 readable
    for r in readable:
        # 如果 r is server,则代表监听套接字有事件发生
        # 显然是客户端连接来了
        if r is server:
            # 监听套接字是非阻塞的,那么已连接套接字默认也是非阻塞的
            # 当然你也可以调用 conn.setblocking(False) 显式地设置一下
            conn, addr = server.accept()
            print(f"和客户端 {addr[0]}:{addr[1]} 建立连接")
            # 将已连接套接字添加到 rlist 中,让它也被 select 监听
            # 当客户端发消息时,它会进入活跃状态,有事件发生
            # 然后被 select 监测到,放到 readable 中,这样遍历的时候就可以处理它了
            rlist.append(conn)
            # 由于客户端连接之后要发消息,那么我们是不是要将消息保存起来呢?
            message_queues[conn] = Queue()  # 负责保存后续客户端发的消息
        else:
            # 如果 r is not server,则代表是已连接套接字有事件发生
            # 说明是某个客户端发送消息了,我们要处理它
            data = r.recv(1024)
            if data:
                # 这里的 r 就是活跃的已连接套接字,调用它的 getpeername 方法
                # 也可以获取到客户端连接的 ip 和 端口
                addr = r.getpeername()
                print(f"收到客户端 {addr[0]}:{addr[1]} 发来的消息:",
                      f"`{data.decode('utf-8')}`")
                # message_queues 保存了所有的已连接套接字
                # 每一个套接字都对应一个队列,找到该活跃套接字对应的队列
                message_queues[r].put(data)  # 将消息放进去

                # 消息放进去了,服务端是不是也要回复呢?显然这属于可写事件
                # 我们还要将 r 放到 wlist 中,这样 select 就会监测到
                # 然后将它放到 writeable 中,我们遍历的时候就可以处理它了
                if r not in wlist:
                    wlist.append(r)
            else:
                # 走到这里说明 data 为假,说明客户端断开连接了,发来一个 b''
                addr = r.getpeername()
                print(f"客户端 {addr[0]}:{addr[1]} 已断开连接")
                # 我们要将套接字从 rlist、wlist 当中移除
                # 客户端都断开连接了,那么 select 也就不需要再监听了
                rlist.remove(r)
                if r in wlist:
                    wlist.remove(r)
                # 对了,还要将它从 message_queues 里面移除
                message_queues.pop(r)
                r.close()  # 关闭套接字连接

    # 以上是遍历可读事件,可读事件可以发生在监听套接字上面(和客户端建立连接)
    # 也可以发生在已连接套接字上面(客户端发信息了)
    # 如果没有事件发生或者处理完毕,那么接下来就要遍历可写事件
    # 而可写事件一定发生在已连接套接字上面(要回消息给客户端)
    for w in writeable:
        message_queue = message_queues[w]
        # 一开始队列里面肯定是有消息的,因为我们手动往里面放了一条
        # 但如果队列为空,说明服务端已经回复过了,那么要将该套接字从 wlist 里面移除
        # 让它变为非活跃状态,不再满足可写
        # 等到下一次客户端发消息时,再将它添加到 wlist 中,让它变得可写
        if message_queue.empty():
            wlist.remove(w)
            continue
        # 获取消息
        data = message_queue.get()
        # 发送给客户端
        w.send("服务端收到,你发的消息是: ".encode("utf-8") + data)

    # 然后遍历 xlist,如果在跟某个客户端通信的过程中,出现了错误
    # 那么将套接字从 rlist、wlist、xlist、message_queue 当中都删除
    # 然后再关闭套接字连接
    for x in exceptional:
        addr = x.getpeername()
        print(f"和客户端 {addr[0]}:{addr[1]} 通信出现错误")
        rlist.remove(x)
        if x in wlist:
            wlist.remove(x)
        message_queues.pop(x)
        x.close()

客户端代码还和之前一样,保持不变,然后来测试一下:

代码应该不难理解,但我们调用 select 背后都发生了什么呢?

1)上下文从用户态进入内核态,把要监听的文件描述符从用户空间拷贝到内核空间;

2)内核通过文件描述符找到所有的套接字,然后遍历,查看套接字是否有对应的事件发生;

3)如果没发生,那么让进程阻塞,当到达规定的超时时间(通过 select 函数的第四个参数指定,不设置则一直阻塞),再将进程唤醒。然后再次进行遍历,直到有事件发生(设备驱动产生中断)。这些都由内核帮我们完成;

4)当事件发生时,对套接字进行遍历,找到那些发生事件的套接字并返回,就是我们代码中的 readable、writeable、exptional;

5)程序对这些活跃的套接字进行处理;

上面的五个步骤就完成了一次 select 监听流程,但很明显我们要一直监听,所以写一个死循环。当 select 监听结束时,立刻开启下一轮 select 监听,因为服务是要不断运行的。

因此通过以上几个步骤不难看出,select 有两个致命的缺陷:

  • 每一次监听都要将所有的文件描述符拷贝到内核态,如果描述符非常多的话,这种拷贝会很耗时;

  • 当事件发生时,还要将所有的文件描述符都遍历一次,才能找到那些有事件发生的套接字。如果描述符非常多,遍历也需要时间;

然后 select 多路复用其实还有一个缺陷,就是它最多同时监听 1024 个文件描述符。

所以 select 多路复用有三个缺陷,因此在工作中我们很少用它。


多路复用之 poll

第二个要介绍的多路复用是 poll,相比 select,它的最大改进就是取消了最多同时监听 1024 个文件描述符这一限制,但其它的两个缺陷却没有得到改善。

多路复用,Windows 只支持 select,macOS 支持 select、poll,Linux 则是 select、poll 和 epoll 都支持。

下面先来简单看一下 poll,它在 select 模块里面是一个类。

import select
from select import POLLIN, POLLOUT, POLLERR, POLLHUP

# select.poll 是一个类
# 我们实例化一个 poll 对象
poll = select.poll()

# 给指定的套接字绑定事件
# 第一个参数可以是描述符,也可以是套接字
# 第二个参数是要绑定的事件
# POLLIN:可读事件(rlist)
# POLLOUT:可写事件(wlist)
# POLLERR:出现异常(xlist)
# POLLHUP:连接中断
poll.register(..., POLLIN | POLLERR)

# 取消某个套接字的事件绑定
poll.unregister(...)

有了 select 的经验,poll 应该不难理解,我们将上面使用 select 的服务端代码,用 poll 重写一下。

import socket
import select
from select import POLLIN, POLLOUT, POLLERR, POLLHUP
from typing import Dict
from queue import Queue

server = socket.socket()
server.bind(("localhost"12345))
server.setsockopt(socket.SOL_SOCKET,
                  socket.SO_REUSEADDR, True)
server.setblocking(False)
server.listen(5)

# 通过文件描述符找到套接字
fd2sk = {server.fileno(): server}
# 保存套接字接收到的客户端发来的消息
message_queues: Dict[int, Queue] = {}

# 实例化一个 poll 对象
poll = select.poll()
# 首先要对 server 进行注册,正如我们使用 select.select 时
# 要先将 server 放在 rlist 里面,然后事件为可读
# 不过由于可能发生错误,因此事件类型为 POLLIN | POLLERR
poll.register(server, POLLIN | POLLERR)

# 开启无限循环
while True:
    # poll 方法接收一个 timeout,不传则表示不设置超时
    # 它和 select.select 的第四个参数的含义相同
    # 当有事件发生时,会返回相应的文件描述符和事件
    events = poll.poll() # 正式开启监听
    # 我们进行遍历
    for fd, event in events:
        # 说明是监听套接字活跃了
        if fd == server.fileno():
            conn, addr = fd2sk[fd].accept()
            print(f"和客户端 {addr[0]}:{addr[1]} 建立连接")
            # 对 conn 进行注册,下一轮循环的时候也会对它进行监听
            # 这里可以传文件描述符、也可以传套接字
            # 如果传套接字,那么会自动调用 fileno 获取描述符
            poll.register(conn, POLLIN | POLLHUP | POLLERR)
            # 维护文件描述符到套接字的映射
            fd2sk[conn.fileno()] = conn
            # 为每个文件描述符维护一个队列,用于保存客户端发来的消息
            message_queues[conn.fileno()] = Queue()

        # 否则说明是已连接套接字有事件发生
        # 那么事件是可读还是可写呢?显然要通过 event 判断
        elif event & POLLIN:  # 可读
            data = fd2sk[fd].recv(1024)
            if data:  # 有数据
                addr = fd2sk[fd].getpeername()
                print(f"收到客户端 {addr[0]}:{addr[1]} 发来的消息:",
                      f"`{data.decode('utf-8')}`")
                # 客户端发消息,服务端也要回消息
                # 因此要给它注册一个可写事件
                poll.register(fd, POLLOUT | POLLHUP | POLLERR)
                # 然后将消息保存起来
                message_queues[fd].put(data)
            else:  # 客户端断开连接
                addr = fd2sk[fd].getpeername()
                print(f"客户端 {addr[0]}:{addr[1]} 已断开连接")
                # 取消监听,会将所有事件全部取消
                poll.unregister(fd)
                # 关闭连接
                fd2sk[fd].close()
                # 从字典中移除
                fd2sk.pop(fd)
        elif event & POLLOUT:  # 已连接套接字可写
            message_queue = message_queues[fd]
            if message_queue.empty():
                # 队列为空,说明消息已经发出去了
                # 那么套接字就不再可写了,因此要取消监听
                # 等到下一次客户端发消息时,再变得可写
                poll.unregister(fd)
                # 但 unregister 会取消所有事件的监听
                # 因此还要重新注册可读事件
                # 不然后续客户端发消息时,就无法处理了
                poll.register(fd, POLLIN | POLLHUP | POLLERR)
            else:
                data = message_queue.get()
                fd2sk[fd].send(
                    "服务端收到,你发的消息是: ".encode("utf-8") + data
                )
        elif event & POLLERR:  # 发生错误
            addr = fd2sk[fd].getpeername()
            print(f"和客户端 {addr[0]}:{addr[1]} 通信出现错误")
            poll.unregister(fd)
            fd2sk[fd].close()
            message_queues.pop(fd)

这段使用 poll 多路复用实现的服务端,和上面使用 select 多路复用实现的服务端,在效果上一模一样,可以测试一下。不过从使用上讲,poll 的话要方便一些。

然后是性能问题,poll 相比 select,只是改善了支持的最大描述符的数量,因此这两种多路复用基本都不用。

于是 Linux 内核在 2.4 的时候引入了 epoll,它是 IO 多路复用的终极形态,我们来看一下。


多路复用之 epoll

通过非阻塞 IO 和 IO 多路复用,单线程的服务端可以同时和多个客户端通信。我们给每个套接字绑定好相应的事件,然后让内核帮忙监听这些套接字,一旦有时间发生就及时通知。

目前使用的多路复用是 select 和 poll,但这两种多路复用存在性能问题。

1)每一次监听都要将文件描述符拷贝到内核空间,当描述符很多的时候,就会很耗时。那么有没有一种机制,只需要拷贝一次就好了呢?后续就交给内核来维护。虽然要交给内核,导致拷贝无法避免,但能不能不要每次都拷贝。

2)当有套接字活跃的时候,select 和 poll 会被唤醒,但它们醒来之后只知道有套接字活跃了,却不知道是哪些套接字活跃,只能挨个遍历所有的套接字。所以能不能有一种机制,负责告知活跃的套接字呢?

为了解决上面两个问题,Linux 引入了 epoll,这也是我们的重点。不过关于 epoll 的原理一会再说,先来看看如何在 Python 里面使用 epoll。

在使用上,epoll 和 poll 高度相似。

我们将之前的服务端代码,使用 epoll 重写一下。

import select
import socket
from queue import Queue
from select import (
    # epoll 和 poll 的用法相似
    # 把事件换成 epoll 的事件即可
    EPOLLIN,
    EPOLLOUT,
    EPOLLERR,
    EPOLLHUP
)
from typing import Dict

server = socket.socket()
server.bind(("localhost"12345))
server.setsockopt(socket.SOL_SOCKET,
                  socket.SO_REUSEADDR, True)
server.setblocking(False)
server.listen(5)

fd2sk = {server.fileno(): server}
message_queues: Dict[int, Queue] = {}

# 实例化一个 epoll 对象
epoll = select.epoll()
# 给 server 注册读事件
epoll.register(server, EPOLLIN | EPOLLERR)

while True:
    # 仍然是调用 poll 方法,开始轮询
    events = epoll.poll()
    for fd, event in events:
        if fd == server.fileno():
            conn, addr = fd2sk[fd].accept()
            print(f"和客户端 {addr[0]}:{addr[1]} 建立连接")
            # 给已连接套接字注册读事件
            # 第一个参数可以传文件描述符、也可以传套接字
            epoll.register(conn, EPOLLIN | EPOLLHUP | EPOLLERR)
            fd2sk[conn.fileno()] = conn
            message_queues[conn.fileno()] = Queue()

        elif event & EPOLLIN:  # 可读
            data = fd2sk[fd].recv(1024)
            if data:  # 有数据
                addr = fd2sk[fd].getpeername()
                print(f"收到客户端 {addr[0]}:{addr[1]} 发来的消息:",
                      f"`{data.decode('utf-8')}`")
                # 客户端发消息了,那么套接字要回复消息,因此满足可写
                # 但是和 poll 不同,epoll 只能给一个套接字注册一次
                # 而之前已经注册过一次了(已连接套接字创建时,注册了读事件)
                # 因此再注册就会报错,因为不能连续注册
                # 所以我们需要将 fd 上的事件取消掉,然后重新注册
                epoll.unregister(fd)
                # 重新注册,此时要同时注册读事件和写事件
                epoll.register(fd, EPOLLIN | EPOLLOUT | EPOLLHUP | EPOLLERR)
                message_queues[fd].put(data)
            else:
                # 客户端断开连接了
                addr = fd2sk[fd].getpeername()
                print(f"客户端 {addr[0]}:{addr[1]} 已断开连接")
                epoll.unregister(fd)
                fd2sk[fd].close()
                fd2sk.pop(fd)
                
        elif event & EPOLLOUT:  # 可写
            message_queue = message_queues[fd]
            if message_queue.empty():
                # 队列是空的,说明消息已经发走了,那么应该取消写事件
                # 做法也很简单:先将事件全部取消,然后重新注册读事件
                # 但也可以通过 modify 方法,直接修改事件类型
                epoll.modify(fd, EPOLLIN | EPOLLHUP | EPOLLERR)
            else:
                data = message_queue.get()
                fd2sk[fd].send(
                    "服务端收到,你发的消息是: ".encode("utf-8") + data
                )
        elif event & EPOLLERR:
            addr = fd2sk[fd].getpeername()
            print(f"和客户端 {addr[0]}:{addr[1]} 通信出现错误")
            epoll.unregister(fd)
            fd2sk[fd].close()
            message_queues.pop(fd)

代码和 poll 高度相似,但它的性能要比 select 和 poll 高很多。经过测试,epoll 在监听 10 个描述符的时候,耗时大概 0.4 秒,而 poll 耗时大概 0.6 秒,两者差别不是很大。但如果监听 10000 个描述符,那么 epoll 耗时大概 0.7 秒,而 poll 的耗时要 990 秒。

所以 epoll 的性能是碾压 select 和 poll 的,那么 epoll 的内部用了哪些黑魔法呢?我们来介绍一下。

首先必须要说明的是,Python select 模块里面的 epoll 实际上是封装了底层操作系统的 epoll,但是让使用变得更简单了,当然 select 和 poll 也是同理。所以接下来我们分析的是,操作系统提供的 epoll。

关于 epoll,操作系统提供了三个 API,分别是  epoll_create,epoll_ctl 和 epoll_wait。

epoll_create

调用 epoll_create 即可创建一个 epoll 实例,函数原型如下:

int epoll_create(int size);

该函数返回一个整型,也就是文件描述符,通过描述符可以找到相应的 epoll 实例。而 Python 在调用 select.epoll() 的时候,底层也会调用 epoll_create,只不过 Python 会封装成一个 epoll 对象再返回,我们直接操作内部的方法即可。

正如打开文件一样,底层在打开文件的时候也是返回一个描述符,而 Python 则是封装一个文件对象再返回。调用文件对象的方法,来操作文件,因为 Python 是面向对象的语言。

函数的 size 参数,在一开始的 epoll_create 实现中,是用来告知内核期望监控的文件描述符的数量,然后内核使用这部分的信息来初始化内核数据结构。但在新的实现中,这个参数不再被需要,因为内核可以动态分配需要的数据结构。我们在使用的时候,将 size 设置成一个大于 0 的整数就可以了。

epoll_ctl

在创建完 epoll 实例之后,可以通过调用 epoll_ctl 往这个 epoll 实例增加或删除监控的事件。函数 epll_ctl 函数如下:

int epoll_ctl(int epfd, int op, int fd, 
              struct epoll_event *event)
;

第一个参数 epfd 是 epoll 实例的描述符,也就是 epoll_create 的返回值。

第二个参数表示是注册事件、还是取消注册的事件,它有三个选项可供选择:

  • EPOLL_CTL_ADD: 给 epoll 实例注册事件;

  • EPOLL_CTL_DEL:取消给 epoll 实例注册事件

  • EPOLL_CTL_MOD: 修改给 epoll 实例注册的事件;

第三个参数表示套接字对应的文件描述符。

第四个参数表示注册的事件类型,有以下几种:

  • EPOLLIN:读事件;

  • EPOLLOUT:写事件;

  • EPOLLERR:出现错误;

  • EPOLLHUP:连接关闭;

  • EPOLLET:触发方式为边缘触发,默认为水平触发(一会解释);

所以该函数就等价于,Python 中 epoll 对象的 register 和 unregister 方法。

epoll_wait

该函数相当于 Python 里面 epoll 对象的 poll 方法,调用之后即可开启监听。

int epoll_wait(int epfd, struct epoll_event *events, 
               int maxevents, int timeout)
;

这种类型的 C 函数,一般返回的都是整型,用来表示成功还是失败。至于真正意义上的返回值,则在调用之前先声明好,然后将指针传进去,函数在内部修改指针指向的值。

所以这里的第二个参数,就相当于 Python 里面 poll 方法返回的 events,里面包含了套接字的描述符和发生的事件。

至于第一个参数,就是 epoll 实例的描述符。

第三个参数表示 epoll 可以处理的最大事件数量。

第四个参数则表示超时时间,不设置的话会一直等待,直到有事件发生。如果设置了,那么当抵达超时时间,无论有没有事件都会立即返回。然后进入下一轮循环,重新开启监听。

epoll 为什么高效?

说完了 epoll 的相关 API,那么我们来聊一聊它为什么高效?

首先是描述符的查找,我们在给某个描述符对应的套接字增加、删除和修改事件的时候,epoll 肯定要找到指定的描述符,那么 epoll 使用什么数据结构来管理这些描述符呢?

答案是红黑树,这是一个非常高效的数据结构,操作元素的时间复杂度为 O(logN),C++ 的 map 也是基于红黑树实现的。

然后再来看看 select 的三个缺点,epoll 是如何改善的,还记得 select 函数的三个缺点吗?

  • 每次调用 select.select() 或者 poll.poll() 的时候,都要将描述符从用户空间拷贝到内核空间,当描述符非常多时开销会很大;

  • 当有事件发生时,需要遍历所有的描述符,一个一个检测,才能知道是哪些套接字有事件发生。在描述符非常多时,开销同样很大;

  • select 支持的文件描述符太少了,默认是 1024;

首先第三个缺点不用多说,epoll 采用红黑树管理描述符,所以它能支持的描述符的数量非常多,就看操作系统能打开多少文件了。

然后是第一个缺点,epoll 在给套接字注册事件、也就是调用 epoll_ctl 的时候就会将其描述符拷贝到内核空间中,而不是等到监听的时候再拷贝。这样的话,每个描述符只需要拷贝一次即可。

最后是第二个缺点,epoll 是通过回调解决的。在给套接字注册事件时,还会为它绑定一个回调函数。当有事件发生时,就会调用该回调函数,将对应的描述符放在一个专门的就绪队列(由双向链表实现)里面,然后唤醒 epoll。

所以就绪队列里面的描述符对应的套接字都是活跃的,不在就绪队列里面则不活跃,也就是没有事件发生,这样就不需要遍历了。内核只需要将就绪队列里的描述符返回即可,并且这个过程还使用了 mmap,省略了拷贝的开销。

Linux 一切皆文件,套接字也不例外,而文件支持很多操作,比如我们熟知的 read, write, fsync, close 等等。但除了这些还有一个 poll 操作,它负责自定制事件的监听逻辑,事件发生时,将描述符添加到就绪队列这一逻辑就由 poll 操作实现。

所以 epoll 管理的描述符对应的文件必须支持 poll 操作,如果文件没有实现,那么它就无法被 epoll 管理。支持 poll 操作的最常见的文件种类就是套接字,而像我们平常使用的文件系统则是不支持的。

因此基于 epoll,单线程也能实现高并发,Redis 和 Nginx 已经证明了这一点。

水平触发和边缘触发

epoll 的工作模式有两种,分别是 LT 和 ET。

  • LT(level trigger):水平触发,当 epoll_wait 检测到描述符有事件发生,并将事件通知给应用程序时,应用程序可以不立即处理该事件。下次调用 epoll_wait 时,会再次响应并通知事件。

  • ET(edge trigger):边缘触发,当 epoll_wait 检测到描述符有事件发生,并将事件通知给应用程序时,应用程序必须立即处理该事件。如果不处理,下次调用 epoll_wait 时,就不会再通知了。

LT 是默认模式,但 ET模式在很大程度上减少了事件被重复触发的次数,因此效率要比 LT 模式高。


selectors:select 模块的封装

这里再补充一个 selectors 模块,它是 select 模块的一个封装,里面提供了多种 IO 多路复用器。但不管哪一种,它们的操作都是一样的,这样就实现了统一。

我们还是将之前的 server 端,重写一下。

import socket
from queue import Queue
# selectors 里面提供了多种"多路复用器"
# 除了 select、poll、epoll 之外
# 还有 kqueue,这个是针对 BSD 平台的
try:
    from selectors import (
        SelectSelector,
        PollSelector,
        EpollSelector,
        KqueueSelector
    )
except ImportError:
    pass
# 由于种类比较多,所以提供了 DefaultSelector
# 会根据当前的系统种类,自动选择一个合适的多路复用器
from selectors import (
    DefaultSelector,
    EVENT_READ,  # 读事件
    EVENT_WRITE,  # 写事件
)

server = socket.socket()
server.bind(("localhost"12345))
server.setsockopt(socket.SOL_SOCKET,
                  socket.SO_REUSEADDR, True)
server.setblocking(False)
server.listen(5)

message_queues = {}
# 实例化一个多路复用器
sel = DefaultSelector()

def accept(server: socket.socket):
    """和客户端建立连接"""
    conn, addr = server.accept()
    print(f"和客户端 {addr[0]}:{addr[1]} 建立连接")
    # 一旦建立连接,那么就要接收客户端消息
    # 所以我们要绑定事件,register 方法接收三个参数
    # 参数一:传一个套接字即可
    # 参数二:事件类型,这里是读事件
    # 参数三:事件发生时,执行的回调函数
    sel.register(conn, EVENT_READ, recv)
    # 表示当 conn 可读时,就去执行 recv 函数
    message_queues[conn] = Queue()

def recv(conn: socket.socket):
    """接收客户端数据"""
    data = conn.recv(1024)
    addr = conn.getpeername()
    if data:  # 有数据
        print(f"收到客户端 {addr[0]}:{addr[1]} 发来的消息:",
              f"`{data.decode('utf-8')}`")
        # 收到数据了,那么要给客户端回复,所以要绑定可写事件
        # 让事件可写,当事件发生时,执行 send 函数
        sel.modify(conn, EVENT_READ | EVENT_WRITE, send)
        message_queues[conn].put(data)
    else:
        print(f"客户端 {addr[0]}:{addr[1]} 已断开连接")
        conn.close()
        # 取消监听
        sel.unregister(conn)
        message_queues.pop(conn)

def send(conn: socket.socket):
    """给客户端发送数据"""
    message_queue = message_queues[conn]
    if message_queue.empty():
        # 队列为空说明已经发送过了,将事件改成可读
        # 继续监听客户端发来的消息
        sel.modify(conn, EVENT_READ, recv)
    else:
        data = message_queue.get()
        conn.send(
            "服务端收到,你发的消息是: ".encode("utf-8") + data
        )

# 给监听套接字注册可读事件
# 当有客户端连接,去执行 accept 函数
sel.register(server, EVENT_READ, accept)
# 在 accept 函数里面创建已连接套接字 conn
# 然后给 conn 绑定可读事件,当客户端发消息时,去执行 recv 函数
# 在 recv 函数里面给套接字绑定可写事件,然后去执行 send 函数

while True:
    # 内部会根据操作系统,选择一个合适的多路复用
    events = sel.select()
    # key 是一个 namedtuple
    # 内部有如下字段:'fileobj', 'fd', 'events', 'data'
    # key.fd 就是套接字的文件描述符
    # key.fileobj 则是套接字本身
    # key.data 是给套接字绑定的回调
    # key.event 则是事件
    for key, mask in events:
        # 事件发生时,获取回调,然后调用
        # 回调显然就是这里的 accept、recv、send
        callback = key.data 
        callback(key.fileobj) 

过 selectors 模块我们将服务端重新实现了,效果和之前是一样的。整个过程就是给 socket 绑定一个事件 + 回调,当事件发生就会告知我们。但是调用回调不是内核自动完成的,而是由我们手动完成的。"非阻塞 + 回调 + 基于 IO 多路复用的事件循环",所有框架基本都是这个套路。

需要说明的是,这种写法的性能非常高,Redis 和 Nginx 都是基于这种方式实现了高并发,但它和我们传统的同步代码的写法大相径庭。如果是同步代码,那么会先建立连接、然后接收数据、再发送数据,这显然更符合我们人类的思维,逻辑自上而下,非常自然。

但是多路复用加回调的方式,就让人很不适应,我们在建立完连接之后,不能直接收数据,必须将接收数据的逻辑放在一个单独的函数(方法)中,然后再将这个函数以回调的方式注册进去。

同理,在接收完数据之后,也不能立刻发送。同样要将发送数据的逻辑放在一个单独的函数中,然后再以回调的方式注册进去。

所以好端端的自上而下的逻辑,因为回调而被分割的四分五裂,这种代码在编写和维护的时候是非常痛苦的。

比如回调可能会层层嵌套,容易陷入回调地狱,如果某一个回调执行出错了怎么办?代码的可读性差导致不好排查,即便排查到了也难处理。

另外,如果多个回调需要共享一个变量该怎么办?为回调是通过事件循环调用的,在注册回调的时候很难把变量传过去。简单的做法是把该变量设置为全局变量,或者说多个回调都是某个类的成员函数,然后把共享的变量作为一个属性绑定在 self 上面。但当逻辑复杂时,就很容易导致全局变量满天飞的问题。

所以这种模式就使得开发人员在编写业务逻辑的同时,还要关注并发细节。

因此使用回调的方式编写异步化代码,虽然并发量能上去,但是对开发者很不友好;而使用同步的方式编写同步代码,虽然很容易理解,可并发量却又上不去。那么问题来了,有没有一种办法,能够让我们在享受异步化带来的高并发的同时,又能以同步的方式去编写代码呢?也就是我们能不能以同步的方式去编写异步化的代码呢?

答案是可以的,使用「协程」便可以办到。协程在这种模式的基础之上又批了一层外衣,兼顾了开发效率与运行效率。

关于协程,我们以后再聊。


小结

以上我们就聊了聊 Python 的 socket,以及多路复用是什么,它在 Python 里面该如何使用。

我们在工作中应该不会直接使用这些东西,但了解一下总归是好的,而且它也为我们后面学习协程打下基础。


本文参考自:

  • 极客时间:《网络编程实战》

  • 极客时间:《Redis 核心技术与实战》

  • https://www.cnblogs.com/Anker/p/3265058.html

  • 小林coding《图解网络》

    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多