分享

java手写消息队列和延迟消息队列

 笑笑兔 2023-09-13

前言

“消息队列”给人感觉很高级的样子,其实我们了解后发现,跟普通技术一样,熟悉后也能很快上手。

队列使用场景

1、商品秒杀

秒杀活动,短时间内出现爆发式用户请求,如果不采取措施,导致服务器假死甚至宕机。如果加上消息队列,服务器接到用户请求后,先存入消息队列再排队处理,这样不会同时处理多个请求情况;如果队列过长超过承载数量,可以抛弃当前用户请求,通知用户“排队中.......”提示。

2、系统解耦

把系统业务功能模块化,实现系统解耦。例如,当用户注册后,反馈注册成功。随后新需求,注册完需要短信提示功能;接着增加用户经验值功能,哪天又去了增加经验。反反复复,肯定受不了。

这个时候,我们想到使用消息队列实现系统解耦,每个功能独立开发,只是订阅或取消订阅的状态可以了。当需要功能时,打开订阅功能队列,反之就关闭。

常用消息中间件RabbitMQ

目前比较常用MQ中间件有RabbitMQ、kafka、RocketMQ,轻量级的有Redis提供的消息队列,简单说一下RabbitMQ队列。

Rabbitmq重要概念:生产者、消费者、代理

生产者:消息的创建者,负责创建和推送数据到消息服务器。
  • 消费者:消息的接收方,用于处理数据和确认消息。

    代理:也就是 RabbitMQ 服务本身,它用于扮演“快递”的角色,因为它本身并不生产消息,只是扮演了“快递”的角色,把消息进行暂存和传递。

    运行流程:


    RabbitMQ优点:
  • 支持持久化,RabbitMQ 支持磁盘持久化功能,保证了消息不会丢失;

  • 高并发,RabbitMQ 使用了 Erlang 开发语言,Erlang 是为电话交换机开发的语言,天生自带高并发光环和高可用特性;

  • 支持分布式集群,正是因为 Erlang 语言实现的,因此 RabbitMQ 集群部署也非常简单,只需要启动每个节点并使用 --link 把节点加入到集群中即可,并且 RabbitMQ 支持自动选主和自动容灾;

  • 支持多种语言,比如 Java、.NET、PHP、Python、JavaScript、Ruby、Go 等;

  • 支持消息确认,支持消息消费确认(ack)保证了每条消息可以被正常消费;

  • 它支持很多插件,比如网页控制台消息管理插件、消息延迟插件等,RabbitMQ 的插件很多并且使用都很方便。

RabbitMQ消息类型:

  • direct(默认类型)模式,此模式为一对一的发送方式,也就是一条消息只会发送给一个消费者;

  • headers 模式,允许你匹配消息的 header 而非路由键(RoutingKey),除此之外 headers 和 direct 的使用完全一致,但因为 headers 匹配的性能很差,几乎不会被用到;

  • fanout 模式,为多播的方式,会把一个消息分发给所有的订阅者;

  • topic 模式,为主题订阅模式,允许使用通配符(#、*)匹配一个或者多个消息,我可以使用“cn.mq.#”匹配到多个前缀是“cn.mq.xxx”的消息,比如可以匹配到“cn.mq.rabbit”、“cn.mq.kafka”等消息。

自定义消息队列

public class CustomQueue {
    // 定义消息队列
    private static Queue<String> queue = new LinkedList<>();

    public static void main(String[] args) {
        producer(); // 调用生产者
        consumer(); // 调用消费者
    }

    // 生产者
    public static void producer() {
        // 添加消息
        queue.add("first message.");
        queue.add("second message.");
        queue.add("third message.");
    }

    // 消费者
    public static void consumer() {
        while (!queue.isEmpty()) {
            // 消费消息
            System.out.println(queue.poll());
        }
    }
}

自定义队列添加延迟,实现Delayed接口,重写getDelay()方法:

 // 延迟消息队列
    private static DelayQueue delayQueue = new DelayQueue();

    public static void main(String[] args) throws InterruptedException {
        producer(); // 调用生产者
        consumer(); // 调用消费者
    }

    // 生产者
    public static void producer() {
        // 添加消息
        delayQueue.put(new MyDelay(1000, "消息1"));
        delayQueue.put(new MyDelay(3000, "消息2"));
    }

    // 消费者
    public static void consumer() throws InterruptedException {
        System.out.println("开始执行时间:" +
                DateFormat.getDateTimeInstance().format(new Date()));
        while (!delayQueue.isEmpty()) {
            System.out.println(delayQueue.take());
        }
        System.out.println("结束执行时间:" +
                DateFormat.getDateTimeInstance().format(new Date()));
    }

class MyDelay implements Delayed {
        // 延迟截止时间(单位:毫秒)
        long delayTime = System.currentTimeMillis();

        // 借助 lombok 实现
        @Getter
        @Setter
        private String msg;

        /**
         * 初始化
         * @param delayTime 设置延迟执行时间
         * @param msg       执行的消息
         */
        public MyDelay(long delayTime, String msg) {
            this.delayTime = (this.delayTime + delayTime);
            this.msg = msg;
        }

        // 获取剩余时间
        @Override
        public long getDelay(TimeUnit unit) {
            return unit.convert(delayTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
        }

        // 队列里元素的排序依据
        @Override
        public int compareTo(Delayed o) {
            if (this.getDelay(TimeUnit.MILLISECONDS) > o.getDelay(TimeUnit.MILLISECONDS)) {
                return 1;
            } else if (this.getDelay(TimeUnit.MILLISECONDS) < o.getDelay(TimeUnit.MILLISECONDS)) {
                return -1;
            } else {
                return 0;
            }
        }

        @Override
        public String toString() {
            return this.msg;
        }
    }

总结

队列使用场景:秒杀活动、系统解耦。常用消息中间件RabbitMQ优点、消息类型。

    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多