分享

rocketmq消峰之流量控制详解

 HUC王子 2021-09-03

在这里插入图片描述
rocketMq消费端消费分数以上三个步骤:
第一: 消费端从rocketMq服务端pull消息,到本地。
第二: 消费端消费pull到的消息。
第三: 消费消费结束后,回复Ack到rocketMq,偏移消费位置。

代码:

 /**
     * 测试mq 并发 接受
     */
    @Component
    @RocketMQMessageListener(topic = ConstantTopic.WRITING_LIKE_TOPIC,selectorExpression = ConstantTopic.WRITING_LIKE_ADD_TAG, consumerGroup = "writing_like_topic_add_group")
    class ConsumerLikeSave implements RocketMQListener<LikeWritingParams>, RocketMQPushConsumerLifecycleListener{

        @SneakyThrows
        @Override
        public void onMessage(LikeWritingParams params) {
            System.out.println("睡上10s");
            //Thread.sleep(10000);

            long begin = System.currentTimeMillis();
            System.out.println("mq消费速度"+Thread.currentThread().getName()+"  "+DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS").format(LocalDateTime.now()));
            //writingLikeService.saveLike2Db(params.getUserId(),params.getWritingId());
            long end = System.currentTimeMillis();
          //  System.out.println("消费:: " +Thread.currentThread().getName()+ "毫秒:"+(end - begin));
        }

        @Override
        public void prepareStart(DefaultMQPushConsumer defaultMQPushConsumer) {
            defaultMQPushConsumer.setConsumeThreadMin(2); //消费端拉去到消息以后分配线索去消费
            defaultMQPushConsumer.setConsumeThreadMax(10);//最大消费线程,一般情况下,默认队列没有塞满,是不会启用新的线程的
            defaultMQPushConsumer.setPullInterval(1000);//消费端多久一次去rocketMq 拉去消息
            defaultMQPushConsumer.setPullBatchSize(32);     //消费端每个队列一次拉去多少个消息,若该消费端分赔了N个监控队列,那么消费端每次去rocketMq拉去消息说为N*1
            defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP);
            defaultMQPushConsumer.setConsumeTimestamp(UtilAll.timeMillisToHumanString3(System.currentTimeMillis()));
            defaultMQPushConsumer.setConsumeMessageBatchMaxSize(2);
        }
    }

第一:从上图分析可得知影响消费速度的几种情况:

1.PullInterval: 设置消费端,拉取mq消息的间隔时间。
注意:该时间算起时间是rocketMq收到Ack后算起。
例如:PullInterval=10s,上次rocketMq服务收到Ack消息后开始算起10s后,在拉去消息,不包含消费端消费占用的时间。

2.PullBatchSize: 设置每次pull消息的数量,该参数设置是针对逻辑消息队列,并不是每次pull消息拉到的总消息数,若上图右边消费端分配了两个消费队列来监听。那么PullBatchSize 设置为32,那么该消费端每次pull到 64个消息。

消费端每次pull到消息总数=PullBatchSize*监听队列数

3.ThreadMin和ThreadMax: 消费端消费pull到的消息需要的线程数量。

以上三种情况:是针对参数配置,来调整消费速度。

除了这三种情况外还有两种服务部署情况,可以调整消费速度:

4.rocketMq 逻辑消费队列配置数量 有消费端每次pull到消息总数=PullBatchSize*监听队列数
可知rocketMq 逻辑消费队列配置数量即上图中的 queue1 ,queue2,配置数量越多每次pull到的消息总数也就越多。如果下边配置读队列数量:
在这里插入图片描述

5.消费端节点部署数量 :部署数量无论一个节点监听所有队列,还是多个节点按照分配策略分配监听队列数量,理论上每秒pull到的数量都一样的,但是多节点消费端消费线程数量要比单节点消费线程数量多,也就是多节点消费速度大于单节点。

第二:从上边分析可得知,控制消费速度可以分为并发和延迟消费两种方案控制流量。

前提:rocketMq 主题topic 配置的消费队列数量一定的情况下讨论,这里设置为queueNum=4个消费队列
1.并发控流:
1.1 部署节点数量 nodeNum
1.2 每个节点消费端线程数量 threadNum
注释:一般情况最小线程和最大线程设置为一样,并且一般情况最大线程不会用到原因见线程池,触发线程次队列塞满才会启动新的线程,
1.3 每次pull的消息间隔时间 pullTime
1.4 批量拉去的数量PullBatchSize

列如上图配置: 部署节点数量为2 ,每个节点消费端线程数量为2 ,每次pull的消息间隔时间为1s,
PullBatchSize 为32

我们先分析一个节点每秒拉取消息数量:

pullTime*PullBatchSize *queueNum=TotalNum

1*32*2=64

此时具体消费速度就看消费端线程数量和接口执行时间来决定64个消息需要多久消费完。

若接口每次执行速度为50毫秒,那么1s两个线程可以执行40个消息,那么该节点处理完64个需要时间为:64/40 s 大约就是1.6s

若配置消费端开启三个线程,那么就是每秒消费64个消息时绰绰有余。

一般情况下pullTime默认为0,即消费速度完全就有消费端线程数量和接口执行速度(其他外在因素不考虑)两个因素来决定。

那么我们来分析下pullTime为0的情况:
一个接口执行时间为50ms,那么1秒一个线程处理20个消息,那么开启10个线程该节点每秒能处理200个线程,开启100个线程那就是2000个消息…

两个节点就是4000个消息这已经超出了mysql能承受的压力,其实这里达到mysql的连接不一定有4000个,毕竟mysql前 边还有一个数据库连接池在控制连接数量间接控制执行速度。

每秒处理消息数量=1s/接口执行时间毫秒*线程数量。

结论:pullTime=0 的情况下,并发控制实际就是调整节点部署数量和消费端消费线程数量,并且要预估每条消息业务处理时间(预估接口执行时间)

2.消费延时控流:

针对消息订阅者的消费延时流控的基本原理是,每次消费时在客户端增加一个延时来控制消费速度,此时理论上消费并发最快速度为:

单节点部署:
ConsumInterval :延时时间单位毫秒
ConcurrentThreadNumber:消费端线程数量
MaxRate :理论每秒处理数量
MaxRate = 1 / ConsumInterval * ConcurrentThreadNumber

如果消息并发消费线程(ConcurrentThreadNumber)为 20,延时(ConsumInterval)为 100 ms,代入上述公式可得

200 = 1 / 0.1 * 20

由上可知,理论上可以将并发消费控制在 200 以下

如果是多个节点部署如两个节点,理论消费速度最高为每秒处理400个消息。

如下延时流控代码:

 /**
     * 测试mq 并发 接受
     */
    @Component
    @RocketMQMessageListener(topic = ConstantTopic.WRITING_LIKE_TOPIC,selectorExpression = ConstantTopic.WRITING_LIKE_ADD_TAG, consumerGroup = "writing_like_topic_add_group")
    class ConsumerLikeSave implements RocketMQListener<LikeWritingParams>, RocketMQPushConsumerLifecycleListener{

        @SneakyThrows
        @Override
        public void onMessage(LikeWritingParams params) {
            System.out.println("睡上0.1秒");
            Thread.sleep(100);

            long begin = System.currentTimeMillis();
            System.out.println("mq消费速度"+Thread.currentThread().getName()+"  "+DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS").format(LocalDateTime.now()));
            //writingLikeService.saveLike2Db(params.getUserId(),params.getWritingId());
            long end = System.currentTimeMillis();
          //  System.out.println("消费:: " +Thread.currentThread().getName()+ "毫秒:"+(end - begin));
        }

        @Override
        public void prepareStart(DefaultMQPushConsumer defaultMQPushConsumer) {
            defaultMQPushConsumer.setConsumeThreadMin(20); //消费端拉去到消息以后分配线索去消费
            defaultMQPushConsumer.setConsumeThreadMax(50);//最大消费线程,一般情况下,默认队列没有塞满,是不会启用新的线程的
            defaultMQPushConsumer.setPullInterval(0);//消费端多久一次去rocketMq 拉去消息
            defaultMQPushConsumer.setPullBatchSize(32);     //消费端每个队列一次拉去多少个消息,若该消费端分赔了N个监控队列,那么消费端每次去rocketMq拉去消息说为N*1
            defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP);
            defaultMQPushConsumer.setConsumeTimestamp(UtilAll.timeMillisToHumanString3(System.currentTimeMillis()));
            defaultMQPushConsumer.setConsumeMessageBatchMaxSize(2);
        }
    }

注释:如上消费端,单节点每秒处理速度也就是最高200个消息,实际上要小于200,业务代码执行也是需要时间。

但是要注意实际操作中并发流控实际是默认存在的,
spring boot 消费端默认配置
this.consumeThreadMin = 20;
this.consumeThreadMax = 20;
this.pullInterval = 0L;
this.pullBatchSize = 32;

若业务逻辑执行需要20ms,那么单节点处理速度就是:1/0.02*20=1000
这里默认拉去的速度1s内远大于1000

注意: 这里虽然pullInterval 等于0 当时受限于每次拉去64个,处理完也是需要一端时间才能回复ack,才能再次拉取,所以消费速度应该小于1000

所以并发流控要消费速度大于消费延时流控 ,那么消费延时流控才有意义

总结:rocketMq 肖锋流控两种方式:
并发流控:就是根据业务流控速率要求,来调整topic 消费队列数量(read queue),消费端部署节点,消费端拉去间隔时间,消费端消费线程数量等,来达到要求的速率内

延时消费流控:就是在消费端延时消费消息(sleep),具体延时多少要根据业务要求速率,和消费端线程数量,和节点部署数量来控制

阿里实例:

    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多