分享

Redis实现可靠低延时的消息队列

 WindySky 2018-02-26

最近项目遇到库存的问题,其实我是抵触的,因为在业务比较小的时候,设置一张表来表示实际库存和锁定库存就好了,但是呢,业务发展就是这样,保不准啥时候就突然增长了,你连反应都没有,系统就挂了。

如果用现在的消息队列,rabbitMQ或者activeMQ ,对于库存这样的数据操作,这个延时性,又是没有办法忍受的,所以想着自己redis实现一个MQ试试,因为redis还是支持原子性操作的,incrBy和decrBy在更新数据的时候,是一个不错的选择。

使用redis的List来实现消息队列,网上很多例子,这里就不详细说明了,其实我这里最主要的是,消息的可靠性处理。

关于唯一性表示的key的存储的integer的值,我使用了双key存储,使用空间来换取时间,主key存值,附属key存有效时间,每次读取主key值的时候读取附属key值来判断时间是否快要过期,如果快要过期就expire一下时间,这样的主key能够实现热点数据存储,还能防止缓存穿透。

不过因为使用的是decrBy会导致一种情况出现,当前库存还剩1个,2个线程同时请求,一个请求减去1个库存,另一个请求减去2个库存,,如果减去2个库存的先执行,他会返回一个-1,然后我会加回去,但是在加回去的之前,减1库存的线程执行了,会返回-2,依然没有办法减库存成功,所以在这中情况下,我采用当减库存返回负数时,在一秒内每隔50毫秒轮询查询一次数据,如果查询回来值大于等于要减去的值,就可以减去,减的时候也得判断一下返回是不是正数。一秒还是没有获取到能够减库存,就返回没有库存。

然后到了发队列信息的步骤,我使用的是lpush,发布成功就OK,不成功,直接存入数据库,然后返回。

队列消费使用每秒轮询的方式,轮询出数据,批量处理多线程处理,定义一个线程池,每次进入定时的时候去看线程池正在运行的线程,算出可以使用的线程,乘以每个线程要批处理的数目,就是这次定时需要读取的数目,使用rpoppush来读取List数据并进行备份,防止取出就断电。

下一秒定时来的时候,依然查看线程池可以使用的线程,这样来达到服务器每秒最大处理笔数的限制。尽量减少数据库等待。

剩下的就是备份库存的处理,这些备份库存可以单独定时处理,不过这个定时尽量在备份时的后一段时候再执行,lpop就行,取出来,判断是否已经处理过。

凌晨或者量小的时候再去处理真正的库存操作,在库存请求的时候如果库存为0 则进行一次库存更新,需要加锁。

    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多