用 500 行 Golang 代码实现高性能的消息回调中间件
zamia
·
2017年09月24日
·
最后由 lithium4010 回复于 2017年09月25日
·
6523 次阅读
用500行 Golang 代码实现高性能的消息回调中间件本文描述了如何实现一个消息回调中间件,得益于 golang 管道和协程的编程思想,通过巧妙的设计,只需要约500行代码就可以实现高性能、优雅关闭、自动重连等特性,全部代码也已经开源在 github/fishtrip/watchman。 问题随着业务复杂度的增加,服务拆分后服务数量不断增加,异步消息队列的引入是必不可少的。当服务较少的时候,比如业务早期,很多时候就是一个比较大的单体应用或者少量几个服务,消息队列(之后写做 MQ,Message Queue)的使用方法如下:
相对来说,发送端比较简单,消费端比较复杂,需要处理的逻辑比较多。比如目前我们公司使用的 sneakers 需要处理如下的逻辑:
而随着服务增多,如果每个需要消费消息的服务都部署一个这样的后台进程显然不够环保:
那有没有更好的办法呢? 其中一般办法是打造一个全局的、高性能的消息回调中间件,中间件来负责队列的管理、消息的收发、重试以及出错处理,这样就不再需要每个服务去考虑诸如消息丢失、消息重试等问题了,基本解决了上面的缺点。具体这个消息回调中心应该具备哪些功能呢?
这样的话,每个服务的工作就变得轻量了很多。本文的目的就是来实现一版生产环境可用的消息回调中间件。当然,我们第一版的回调中心也不需要太多功能,有如下的限制:
基本的需求有了,如何实现一个这样的消息回调中间件呢? 解决思路开发语言选择Golang 作为「系统级开发语言」,非常适合开发这类中间件。内建的 goroutine/channel 机制非常容易实现高并发。而作为 Golang 新手,这个项目也不复杂,很适合练手和进一步学习。 消息可靠性关于重试和出错处理呢?我们从 Sneakers 的实现中借鉴了它的方法,通过利用 RabbitMQ 内置的机制,也就是通过 x-dead-letter 机制来保证消息在重试时可以做到高可靠性,具体可以参考前段时间我写的这篇文章。简单总结一下文中的思路:
这里面有两个地方利用了 RabbitMQ 的 Dead-Letter 机制:
通过这种机制,可以保证在进行消息处理的时候,不管是正常、还是出错时,消息都不会丢失。关于这里进一步的细节可以参考上面的文章。 实现高并发对于中间件,性能的要求比较高,性能也包含两个方面:低延迟和高并发。低延迟在这个场景中我们无法解决,因为一个消息回调之后的延迟是其他业务服务决定的。所以我们更多的是追求高并发。 如何获得高并发?首先是开发语言的选择,这类底层的中间件很适合用 Golang 来实现,为什么呢?因为回调中心的主逻辑就是不断回调各个服务,而各个服务的延迟时间中间件无法控制,所以如果想获得高并发,最好是使用异步事件这种机制。而借助于 Golang 内置的 Channel ,既可以获得接近于异步事件的性能,又可以让整个开发变得简单高效,是一个比较合适的选择。 具体实现呢?其实对于一个回调中心来说,大概分成这么几个步骤:
通过消息这么一个「实体」可以把所有上面的流程串联起来,是不是很像 pipeline ?而 pipeline 的设计模式是 Golang 非常推荐的实现高并发的方式。上面的每一个步骤可以看做一组协程(goroutine),他们之间通过管道通信,因此不存在资源竞争的情况,大大降低了开发成本。 而上面每个步骤可以通过设计不同的 Goroutine 模型来实现高并发:
上面四个步骤,我们用了三种协程的设计模型,细化一下上面的图就是这个样子的。
实现有了上面的设计过程,代码并不复杂,大概分为几部分:配置管理、主流程、消息对象、重试逻辑以及优雅关闭等的实现。详细的代码放在 github:fishtrip/watchman 配置管理配置管理这部分,这个版本我们实现的比较简单,就是读取 yml 配置文件。配置文件主要包含的主要是三部分信息:
我们使用 yaml.v2 包可以很方便的解析 yaml 配置文件到 struct 之中,比如对于 queue 的定义,struct 实现如下:
上面之所以需要一个 ProjectConfig 的指针,主要是为了方便读取 project的配置,因此加载的时候需要把队列指向 project。
上面代码中有个地方容易出错,就是在 for 循环内部设置指针的时候不能直接使用 queue 变量,因为此时获取的 queue 变量是一份复制出来的数据,并不是原始数据。 另外,config.go 中大部分逻辑是按照面向对象的思考方式来书写的,比如:
通过这种方式,可以写出更清晰可维护的代码。 消息对象封装整个程序中,在 channel 中传递的数据都是 Message 对象,通过这种对象封装,可以非常方便的在各种类型的 Goroutine 之间传递数据。 Message 类的定义如下:
我们把 RabbitMQ 中的原生消息以及队列信息、回调结果封装在一起,通过这种方式把 Message 对象在管道之间传递。同时 Message 封装了众多的方法来供其他协程方便的调用。
注意上面方法的接收对象,带指针的接收对象表示会修改对象的值。 主流程主流程就是我们上面说的,通过 pipeline 的模式、把消息的整条流程串联起来。最核心的代码在这里:
上面每个函数都接收相同的管道定义,因此可以串联使用。其实每个函数的实现区别不大,不同的协程模型可能需要不同的写法。 下面是 receiveMessage 的写法,并进行了详细的注释。revceiveMessage 对每个消息队列都生成了 N 个协程,然后把读取的消息全部写入管道。
里面有几个关键点需要注意。
总结得益于 Golang 的高效的表达能力,通过大约 500 行代码实现了一个稳定的消息回调中间件,同时具备下面的特性:
当然,我们团队目前还都是 golang 新手,也没有做太多的单元测试和性能测试,下一步可能会继续优化,完善测试工作,并且优化配置的管理,欢迎各位去 github 围观源码。
共收到 13 条回复
看前后的差异只是把消费者从消息处理进程换成了 web 进程,这里并没有节约资源啊。本来我只要解析简单的消息,现在要过整个 web 栈。 没有问题 创造问题也要上
对
lithium4010
#1
回复
任何问题都是基于特定场景,考虑一下有 10 个服务、甚至上百个服务的情况,是每个服务带一套消息处理的机制、还是设计一个回调中心? 请问 是解决了每个服务带一套消息处理的机制 还是 每个服务都需要一个消息处理进程? 消息处理机制是由RabbitMQ调度,不知是否理解错
对
lithium4010
#5
回复
我们也是用 sneaker,但是考虑一下每个服务(十多个)都配一套 sneaker 就显得没有必要了;另外,我们也有一些 java 的服务,也要配置一个类似于 sneaker 的东西。有了回调中心之后,sneaker 就可以不再使用了,实现相关的回调接口就可以了。当然回调接口中可以再把工作丢给 sidekiq 也行。
对
pathbox
#4
回复
你说的是同一回事,每个服务自带一套消息处理机制,具体的实现基本上就是独立的进程(像 sneaker)或者 java 服务里面单开一套线程池。
对
zamia
#8
回复
明白了。真正的worker 处理,还是sneaker或sidekiq中处理吗?还是直接实现回调接口,就没sneaker 或sidekiq什么事了 @pathbox 我的理解是这个设计将下面几项单独出来:
这个设计优势会体现在使用不同的语言, 都需要使用与 MQ 之间交互, 并且大家都需要 "合理的重试机制", 保证只要消息没有成功处理, 就停留在 MQ 中. 但劣势也很明显, 每一个任务都需要具体的服务执行方提供一个结果 成功否 返回. 现在的方案中是 HTTP 的同步方案. 虽然 Golang 的 goroutine 能支撑起连接数, 但是这样的同步回调机制对服务执行方的接入会有场景限制, 那些执行时间特别长的应该不太愿意接入(想象一个 http 请求连着 5 分钟等返回). 如果考虑将这个中间件设计成为一个与外部系统异步交互的方式, 使用端应该更乐意接入. 从另外一个角度来考虑, RabbitMQ 本身是作为多个服务之间使用消息传递解耦而存在的中间件, 如果可以将这些 "重用" 的机制以扩展或者插件的方式实现到现有语言(java, ruby, golang) 与 RabbitMQ 交互的代码中, 也不失为一种方案. 这样少一层抽象少一层理解. @zamia ruby 和 golang 配合很是强大 提供两个 tips:
对
wppurking
#10
回复
赞👍,解释的很专业。 你说的慢连接的问题目前我们是从规范上来限制,对于执行时间长的请求,一律扔到 sidekiq 这类任务系统里面去处理,http 请求快速返回。 cony 我也去看看,目前重连我实现的也比较原始;golang dep 我也想用,不过看着还没有正式发版,就先用 glide 了。 |
|