分享

一百万个 WebSockets 连接 和 Go

 技术的游戏 2023-05-23 发布于广东

最近在研究 Websockets,文章来源于 Sergey Kamardin,是 Mail.Ru 的一名开发人员。

这篇文章是关于我们如何使用 Go 开发高负载 WebSocket 服务的。

如果您熟悉 WebSockets,但对 Go 知之甚少,希望您在性能优化的思想和技术方面仍然对本文感兴趣。

1. 简介

为了定义我们故事的上下文,应该说几句关于为什么我们需要这个服务器。

Mail.Ru 网站有很多状态系统。用户电子邮件存储就是其中之一。有几种方法可以跟踪系统内的状态变化——以及系统事件。这主要是通过定期系统轮询或关于其状态更改的系统通知。

两种方式各有利弊。但在邮件方面,用户收到新邮件的速度越快越好。

邮件轮询涉及每秒约 50,000 个 HTTP 查询,其中 60% 返回 304 状态,这意味着邮箱没有变化。

因此,为了减少服务器的负载并加快向用户发送邮件的速度,决定重新发明轮子,编写发布-订阅服务器(也称为总线、消息代理或事件- channel) ,一方面会接收有关状态更改的通知,另一方面会接收此类通知的订阅。

之前:

现在:

第一个方案显示了它之前的样子。浏览器定期轮询 API 并询问存储(邮箱服务)更改。

第二个方案描述了新的架构。浏览器与通知 API 建立 WebSocket 连接,通知 API 是总线服务器的客户端。收到新电子邮件后,Storage 会向 Bus (1) 发送有关它的通知,然后将 Bus 发送给其订阅者 (2)。API 确定连接以发送收到的通知,并将其发送到用户的浏览器 (3)。

所以今天我们将讨论 API 或 WebSocket 服务器。展望未来,我告诉你服务器将有大约 300 万在线连接。

2. 惯用的方式

让我们看看我们如何在没有任何优化的情况下使用普通的 Go 功能来实现我们服务器的某些部分。

在我们继续之前net/http,让我们谈谈我们将如何发送和接收数据。位于 WebSocket 协议之上的数据(例如 JSON 对象)在下文中将被称为数据包

让我们开始实现Channel将包含通过 WebSocket 连接发送和接收此类数据包的逻辑的结构。

2.1. 通道结构

// Packet represents application level data.
type Packet struct {
...
}

// Channel wraps user connection.
type Channel struct {
conn net.Conn // WebSocket connection.
send chan Packet // Outgoing packets queue.
}

func NewChannel(conn net.Conn) *Channel {
c := &Channel{
conn: conn,
send: make(chan Packet, N),
}

go c.reader()
go c.writer()

return c
}

WebSocket 通道实现。

我想提请您注意两个读写 goroutines 的启动。每个 goroutine 都需要自己的内存堆栈,其初始大小可能为 2 到 8 KB,具体取决于操作系统和 Go 版本。

对于上面提到的 300 万个在线连接数,我们将需要24 GB 的内存(加上 4 KB 的堆栈)用于所有连接。Channel这还没有为结构、传出数据包和其他内部字段分配的内存ch.send

2.2. I/O 协程

让我们看一下reader的实现:

func (c *Channel) reader() {
// We make a buffered read to reduce read syscalls.
buf := bufio.NewReader(c.conn)

for {
pkt, _ := readPacket(buf)
c.handle(pkt)
}
}

Channel 的读取 goroutine。

在这里,我们使用 来bufio.Reader减少系统调用的数量read(),并在缓冲区大小允许的范围内读取尽可能多的数据buf。在无限循环中,我们期待新数据的到来。请记住这句话:期待新数据的到来。我们稍后会返回给他们。

我们将把传入数据包的解析和处理放在一边,因为它对我们将要讨论的优化并不重要。然而,buf现在值得我们注意:默认情况下,它是 4 KB,这意味着我们的连接还有12 GB的内存。writer也有类似的情况:

func (c *Channel) writer() {
// We make buffered write to reduce write syscalls.
buf := bufio.NewWriter(c.conn)

for pkt := range c.send {
_ := writePacket(buf, pkt)
buf.Flush()
}
}

Channel 的写入 goroutine。

我们遍历传出数据包通道c.send并将它们写入缓冲区。正如我们细心的读者已经猜到的那样,这是用于我们 300 万个连接的另外 4 KB 和12 GB内存。

2.3. HTTP

我们已经有了一个简单的Channel实现,现在我们需要获得一个 WebSocket 连接才能使用。由于我们仍在惯用方式标题下,让我们以相应的方式进行。

注意:如果你不知道 WebSocket 是如何工作的,应该提到客户端通过一种称为升级的特殊 HTTP 机制切换到 WebSocket 协议。成功处理升级请求后,服务器和客户端使用 TCP 连接交换二进制 WebSocket 帧。这里是对连接内部的帧结构的描述。

import (
"net/http"
"some/websocket"
)

http.HandleFunc("/v1/ws", func(w http.ResponseWriter, r *http.Request) {
conn, _ := websocket.Upgrade(r, w)
ch := NewChannel(conn)
//...
})

升级到 WebSocket 的惯用方式。

请注意,为初始化和进一步的响应http.ResponseWriter写入分配内存bufio.Readerbufio.Writer均有 4 KB 缓冲区)*http.Request

无论使用什么 WebSocket 库,在成功响应升级请求后,服务器都会收到I/O 缓冲区以及调用后的 TCP 连接responseWriter.Hijack()

提示:在某些情况下,go:linkname可用于通过调用net/http.putBufio{Reader,Writer}将缓冲区返回到net/http内的sync.Pool

因此,我们需要另外24 GB的内存来支持 300 万个连接。

因此,还没有执行任何操作的应用程序总共需要72 GB的内存!

3. 优化

让我们回顾一下我们在介绍部分讨论的内容并记住用户连接的行为方式。切换到 WebSocket 后,客户端发送一个包含相关事件的数据包,或者换句话说,订阅事件。然后(不考虑诸如连接之类的技术消息ping/pong),客户端可能在整个连接生命周期内不发送任何其他内容。

连接寿命可能持续几秒到几天。

因此,在大多数情况下,我们Channel.reader()正在Channel.writer()等待处理接收或发送的数据。与它们一起等待的是每个 4 KB 的 I/O 缓冲区。

现在很明显,某些事情可以做得更好,不是吗?

3.1. Netpoll

您还记得Channel.reader()实现通过锁定bufio.Reader.Read()内的conn.Read()调用来预期新数据的到来吗?如果连接中有数据,Go 运行时会“唤醒”我们的 goroutine 并允许它读取下一个数据包。之后,goroutine 在等待新数据时再次被锁定。让我们看看 Go 运行时如何理解 goroutine 必须被“唤醒”。

如果我们查看conn.Read() 实现,我们将在其中看到net.netFD.Read() 调用:

// net/fd_unix.go

func (fd *netFD) Read(p []byte) (n int, err error) {
//...
for {
n, err = syscall.Read(fd.sysfd, p)
if err != nil {
n = 0
if err == syscall.EAGAIN {
if err = fd.pd.waitRead(); err == nil {
continue
}
}
}
//...
break
}
//...
}

了解有关非阻塞读取的内部知识。

Go 在非阻塞模式下使用套接字。EAGAIN 表示套接字中没有数据并且不会在从空套接字读取时被锁定,操作系统将控制权返回给我们。

我们read()从连接文件描述符中看到一个系统调用。如果读取返回EAGAIN 错误,运行时将调用 pollDesc.waitRead():

// net/fd_poll_runtime.go

func (pd *pollDesc) waitRead() error {
return pd.wait('r')
}

func (pd *pollDesc) wait(mode int) error {
res := runtime_pollWait(pd.runtimeCtx, mode)
//...
}

了解有关 netpoll 用法的内部信息。

如果我们深入挖掘,我们会发现 netpoll在 Linux 中使用epoll实现,在 BSD 中使用 kqueue实现。为什么不对我们的连接使用相同的方法?我们可以分配一个读取缓冲区并仅在真正需要时才启动读取 goroutine:当套接字中确实有可读数据时。

在 github.com/golang/go 上,有导出 netpoll 函数的问题。

3.2. 摆脱协程

假设我们有Go 的netpoll 实现。现在我们可以避免Channel.reader()使用内部缓冲区启动 goroutine,并订阅连接中可读数据的事件:

ch := NewChannel(conn)

// Make conn to be observed by netpoll instance.
poller.Start(conn, netpoll.EventRead, func() {
// We spawn goroutine here to prevent poller wait loop
// to become locked during receiving packet from ch.
go Receive(ch)
})

// Receive reads a packet from conn and handles it somehow.
func (ch *Channel) Receive() {
buf := bufio.NewReader(ch.conn)
pkt := readPacket(buf)
c.handle(pkt)
}

使用netpoll

使用更容易,Channel.writer()因为我们可以运行 goroutine 并仅在我们要发送数据包时分配缓冲区:

func (ch *Channel) Send(p Packet) {
if c.noWriterYet() {
go ch.writer()
}
ch.send <- p
}

仅在需要时启动 writer goroutine。

请注意,我们不处理操作系统返回EAGAIN系统write()调用的情况。对于这种情况,我们依靠 Go 运行时,因为这种服务器实际上很少见。然而,如果需要,它可以以相同的方式处理。

从(一个或多个)读取传出数据包后ch.send,编写器将完成其操作并释放 goroutine 堆栈和发送缓冲区。

完美的作品!通过去除两个连续运行的 goroutine 中的堆栈和 I/O 缓冲区,我们节省了48 GB 。

3.3. 资源控制

大量的连接不仅涉及高内存消耗。在开发服务器时,我们经历了反复出现的竞争条件和死锁,随后经常出现所谓的自我 DDoS——应用程序客户端疯狂地尝试连接到服务器从而进一步破坏服务器的情况。

例如,如果由于某种原因我们突然无法处理ping/pong消息,但是空闲连接的处理程序继续关闭此类连接(假设连接断开因此没有提供数据),客户端似乎每隔 N 秒就失去连接并尝试再次连接而不是等待事件。

如果锁定或过载的服务器只是停止接受新连接,并且它之前的平衡器(例如,nginx)将请求传递给下一个服务器实例,那就太好了。

此外,无论服务器负载如何,如果所有客户端出于任何原因突然要向我们发送数据包(可能是由于错误原因),之前保存的48 GB将再次使用,因为我们实际上将回到初始状态每个连接的 goroutine 和缓冲区。

协程池

我们可以使用 goroutine 池限制同时处理的数据包数量。这就是这种池的简单实现:

package gopool

func New(size int) *Pool {
return &Pool{
work: make(chan func()),
sem: make(chan struct{}, size),
}
}

func (p *Pool) Schedule(task func()) error {
select {
case p.work <- task:
case p.sem <- struct{}{}:
go p.worker(task)
}
}

func (p *Pool) worker(task func()) {
defer func() { <-p.sem }
for {
task()
task = <-p.work
}
}

goroutine 池的简单实现。

现在我们的代码netpoll如下所示:

pool := gopool.New(128)

poller.Start(conn, netpoll.EventRead, func() {
// We will block poller wait loop when
// all pool workers are busy.
pool.Schedule(func() {
Receive(ch)
})
})

在 goroutine 池中处理轮询器事件。

所以现在我们不仅在套接字中出现可读数据时读取数据包,而且在第一次有机会占用池中的空闲 goroutine 时读取数据包。

同样,我们将更改Send()

pool := gopool.New(128)

func (ch *Channel) Send(p Packet) {
if c.noWriterYet() {
pool.Schedule(ch.writer)
}
ch.send <- p
}

复用编写 goroutine。

而不是go ch.writer(),我们想写在一个重用的 goroutines 中。因此,对于一个 goroutines 池N,我们可以保证在N请求同时处理和到达N + 1时,我们不会为N + 1读取分配缓冲区。goroutine 池还允许我们限制Accept()Upgrade()新连接并避免大多数 DDoS 情况。

3.4. 零拷贝升级

让我们稍微偏离一下 WebSocket 协议。如前所述,客户端使用 HTTP 升级请求切换到 WebSocket 协议。这是它的样子:

GET /ws HTTP/1.1
Host: mail.ru
Connection: Upgrade
Sec-Websocket-Key: A3xNe7sEB9HixkmBhVrYaA==
Sec-Websocket-Version: 13
Upgrade: websocket

HTTP/1.1 101 Switching Protocols
Connection: Upgrade
Sec-Websocket-Accept: ksu0wXWG+YmkVx+KQR2agP0cQn4=
Upgrade: websocket

HTTP 升级示例。

也就是说,在我们的例子中,我们只需要 HTTP 请求及其标头来切换到 WebSocket 协议。这些知识和存储在其中的内容http.Request表明,为了优化,我们可能会在处理 HTTP 请求时拒绝不必要的分配和复制,并放弃标准服务器net/http

例如,http.Request包含一个具有同名 Header 类型的字段,通过将数据从连接复制到值字符串,该字段无条件地填充所有请求标头。想象一下这个字段中可以保留多少额外数据,例如用于大型 Cookie 标头。

但是要得到什么回报呢?

WebSocket 实现

不幸的是,在我们优化服务器时存在的所有库都允许我们只对标准net/http服务器进行升级。此外,这两个(两个)库都无法使用上述所有读写优化。为了使这些优化起作用,我们必须有一个相当低级的 API 来处理 WebSocket。要重用缓冲区,我们需要协议函数如下所示:

func ReadFrame(io.Reader) (Frame, error)
func WriteFrame(io.Writer, Frame) error

如果我们有一个带有此类 API 的库,我们可以按如下方式从连接中读取数据包(数据包写入看起来是一样的):

// getReadBuf, putReadBuf are intended to 
// reuse *bufio.Reader (with sync.Pool for example).
func getReadBuf(io.Reader) *bufio.Reader
func putReadBuf(*bufio.Reader)

// readPacket must be called when data could be read from conn.
func readPacket(conn io.Reader) error {
buf := getReadBuf()
defer putReadBuf(buf)

buf.Reset(conn)
frame, _ := ReadFrame(buf)
parsePacket(frame.Payload)
//...
}

预期的 WebSocket 实现 API。

简而言之,是时候创建我们自己的库了。

github.com/gobwas/ws

从意识形态上讲,编写该ws库是为了不将其协议操作逻辑强加给用户。所有读写方法都接受标准io.Readerio.Writer接口,这使得使用或不使用缓冲或任何其他 I/O 包装器成为可能。

除了来自标准的升级请求外net/httpws还支持零拷贝升级、处理升级请求和切换到 WebSocket 而无需内存分配或复制。ws.Upgrade()接受io.ReadWriternet.Conn实现这个接口)。换句话说,我们可以使用标准net.Listen()并将接收到的连接从ln.Accept()immediately 转移到ws.Upgrade(). 该库可以复制任何请求数据以供将来在应用程序中使用(例如,Cookie验证会话)。

下面是升级请求处理的基准net/http:标准服务器与net.Listen()零拷贝升级:

BenchmarkUpgradeHTTP    5156 ns/op    8576 B/op    9 allocs/op
BenchmarkUpgradeTCP 973 ns/op 0 B/op 0 allocs/op

切换到ws零拷贝升级为我们节省了另外24 GB——处理程序处理请求时为 I/O 缓冲区分配的空间net/http

3.5. 概括

让我们构建我告诉过你的优化。

  • 内部有缓冲区的读取 goroutine 是昂贵的。解决方案:netpoll(epoll, kqueue);重用缓冲区。

  • 内部带有缓冲区的写 goroutine 是昂贵的。解决方案:必要时启动goroutine;重用缓冲区。

  • 在连接风暴中,netpoll 将无法工作。解决方案:在数量限制下重用 goroutines。

  • net/http不是处理升级到 WebSocket 的最快方法。解决方案:在裸 TCP 连接上使用零拷贝升级。

这就是服务器代码的样子:

import (
"net"
"github.com/gobwas/ws"
)

ln, _ := net.Listen("tcp", ":8080")

for {
// Try to accept incoming connection inside free pool worker.
// If there no free workers for 1ms, do not accept anything and try later.
// This will help us to prevent many self-ddos or out of resource limit cases.
err := pool.ScheduleTimeout(time.Millisecond, func() {
conn := ln.Accept()
_ = ws.Upgrade(conn)

// Wrap WebSocket connection with our Channel struct.
// This will help us to handle/send our app's packets.
ch := NewChannel(conn)

// Wait for incoming bytes from connection.
poller.Start(conn, netpoll.EventRead, func() {
// Do not cross the resource limits.
pool.Schedule(func() {
// Read and handle incoming packet(s).
ch.Recevie()
})
})
})
if err != nil {
time.Sleep(time.Millisecond)
}
}

具有 netpoll、goroutine 池和零拷贝升级的示例 WebSocket 服务器。

4. 结论

过早的优化是编程中万恶之源(或者至少是万恶之源)。唐纳德高德纳

当然,上述优化是相关的,但并非在所有情况下。例如,如果可用资源(内存、CPU)与在线连接数之间的比率相当高,则可能没有优化的意义。但是,如果知道需要改进的地方和需要改进的地方,您会受益匪浅。

感谢您的关注!

5. 参考资料

  • https://github.com/mailru/easygo

  • https://github.com/gobwas/ws

  • https://github.com/gobwas/ws-examples

  • https://github.com/gobwas/httphead

    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多