分享

消息中间件的应用场景与 RabbitMQ的六种工作模式介绍

 邸彦强 2023-04-10 发布于河北

消息中间件应用场景

消息中间件的实际应用场景有哪些?主要有三种,分别为异步处理、应用解耦、流量削峰。

异步处理

可以理解为非阻塞处理,作用提高系统响应速度与吞吐量,增加用户体验。
举个例子,领导要求开发的投诉小程序,所有接口响应需要在300ms内完成。大于300ms需要优化,有这样的一个需求:
1、投诉内容需要在投诉系统进行落盘存储(整个流程需要150ms)2、投诉系统需要把通过接口把投诉内容传给大数据进行沉淀(200ms)整个投诉环节形成闭环需要350ms,不能满足响应需求。在这里插入图片描述
只能调整技术方案,加入消息中间件(rabbitmq)总体响应时间为150ms,满足需求,如图
在这里插入图片描述

应用解耦

解耦的前提是要有耦合,什么是耦合? 耦合可以理解为耦合度,耦合度是指模块或者应用之间的依赖关系,关系越多耦合度就越高,系统的复杂度就越高(不满足架构设计的三大原则之一简单原则),那就意味着需要解耦,解耦的目的就是提高系统的独立性,方便日后开发维护。
举个例子:还是投诉系统,1、投诉系统需要把投诉数据推送到大数据中心进行沉淀汇聚;2、需要调用短信系统进行实时发送处理信息给相关工作人员(您有新的投诉待处理,请快速登录系统处理);3、系统的产生的日志(接口日志、sql日志)需要调用系统管理系统进行存储。如图
在这里插入图片描述
以上系统会存在一个问题,假如投诉后台调用第三方(短信、大数据、日志)系统用的同步处理机制,而不是多线程异步任务机制,此时短信系统宕机了,投诉后台就会返回失败。系统健壮性太差了,必须优化,增加消息中间件。如图
在这里插入图片描述

投诉后台把投诉内容写入mq后,返回成功,其他业务系统通过订阅mq进行各自业务处理即可,就算短信服务宕机也不影响其他系统。这样一来,增强了系统的健壮性、提高的系统的qps、系统也完美进行了解耦,方便维护。

流量削峰

我们经常看到这样的一个场景,比如双十一0点抢购,或者政府优惠券8点开枪,系统的高峰访问量可能就是持续1到2个小时,之后就恢复平静或者空闲,如何保证高并发的情况下系统正常运作呢?
我们都知道服务器的资源或者处理能力都是有限的,都有最大的qps,对于系统的瞬间海量的请求冲击,服务器可能忙不过来或者直接宕机,很多人就会增加服务器数量来提高系统的高可用,来满足高并发,但是在空闲的时候有没有什么请求,这样一来导致资源浪费,首先肯定的是技术方案是可行的,但是成本太高了,太烧钱了,不是每个老板都是大方的。
所以采用mq进行削峰更优更省钱,其实mq削峰的本质就是延缓请求处理,借用网上一张图
在这里插入图片描述

RabbitMQ的六种工作模式

RabbitMQ主要有六种工作模式,通过整合SpringBoot分别介绍工作模式的实现。

简单模式

简单模式也称为点对点模式 。
方式为:生产者->队列->消费者 ,生产者与消费者之间直接通过队列名称匹配,没有Exchange交换机。
应用场景:单机应用直接消息传递,后续不会在扩展新的服务或者更多的队列,比如投诉系统的日志写入Mq,日志采集系统进行读取存储。
在这里插入图片描述
代码实例

SimpleConfig配置

/**
 * 直连模式只需要声明队列,所有消息都通过队列转发。
 * 无需设置交换机
 */
@Configuration
public class SimpleConfig {

	@Bean
	public Queue setQueue() {
		return new Queue('logQueue');
	}
}

生产者

//simpleSend 直连模式
	@ApiOperation(value='simpleSend发送接口',notes='直接发送到队列')
	@GetMapping(value='/simpleSend')
	public Object simpleSend(String message) throws AmqpException, UnsupportedEncodingException {
		//设置部分请求参数
		MessageProperties messageProperties = new MessageProperties();
		messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN);
		//发消息
		rabbitTemplate.send('logQueue',new Message(message.getBytes('UTF-8'),messageProperties));
		return 'message sended : '+message;
	}

消费者

//直连模式的多个消费者,会分到其中一个消费者进行消费。类似task模式
	//通过注入RabbitContainerFactory对象,来设置一些属性,相当于task里的channel.basicQos
	@RabbitListener(queues='logQueue')
	public void helloWorldReceive(String message) {
	     System.out.println('simpleSend模式 received message : ' +message);
	}

工作模式

工作模式可以理解为竞争消费模式,类似nginx的负载均衡轮询模式。
方式为:生产者->队列->多个消费者,与简单模式不同的是其具有多个消费者,可理解为一对多的关系。
应用场景:集群部署中的消息传递,举个例子,投诉系统与日志系统都实现集群高可用,投诉系统生产了10条日志数据,会循环发送消息给订阅的消费者,3个消费者(日志管理系统)读取到消息累加总数为10。如图所示:
在这里插入图片描述

代码实例
工作模式配置

@Configuration
public class WorkConfig {
    
    //声明队列
    @Bean
    public Queue workQ1() {
        return new Queue('workLogQueue');
    }

}

生产者

//工作队列模式
	@ApiOperation(value='workqueue发送接口',notes='发送到所有监听该队列的消费')
	@GetMapping(value='/workqueueSend')
	public Object workqueueSend(String message) throws AmqpException, UnsupportedEncodingException {
		MessageProperties messageProperties = new MessageProperties();
		messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN);
		//制造多个消息进行发送操作
		for (int i = 0; i <10 ; i++) {
			rabbitTemplate.send('workLogQueue',  new Message((message+i).getBytes('UTF-8'),messageProperties));
		}
		return 'message sended : '+message;
	}

消费者

//工作队列模式
    @RabbitListener(queues='workLogQueue')
    public void wordQueueReceiveq1(String message) {
		System.out.println('工作队列模式1 received message : ' +message);
    }

    @RabbitListener(queues='workLogQueue')
    public void wordQueueReceiveq2(String message) {
		System.out.println('工作队列模式2 received message : ' +message);
    }

结果
在这里插入图片描述

发布订阅模式

发布订阅模式也称之为广播模式,生产者每一条消息都需要发送给订阅的消费者,和工作模式都是一对多的关系,不同的是工作模式是每条消息只能发送给一个消费者。
方式为:生产者->交换机->队列(多个)->消费者(多个)
应用场景:AI智能系统通过AI摄像头实时监控景区的客流人数,采集现场图片进行分析是否有火灾,从而产生告警事件(客流事件、火灾事件),第三方系统(管控系统、物联系统)都需要实时采集AI系统的告警事件进行各自业务处理。
在这里插入图片描述
代码实例
配置

/**
 * Fanout模式需要声明exchange,并绑定queue,由exchange负责转发到queue上。
 * 广播模式 交换机类型设置为:fanout
 */
@Configuration
public class FanoutConfig {

	//声明队列
	@Bean
	public Queue fanoutQ1() {
		return new Queue('fanout.keliu');
	}
	@Bean
	public Queue fanoutQ2() {
		return new Queue('fanout.huozai');
	}


	//声明exchange
	@Bean
	public FanoutExchange setFanoutExchange() {
		return new FanoutExchange('fanoutExchange');
	}


	//声明Binding,exchange与queue的绑定关系
	@Bean
	public Binding bindQ1() {
		return BindingBuilder.bind(fanoutQ1()).to(setFanoutExchange());
	}
	@Bean
	public Binding bindQ2() {
		return BindingBuilder.bind(fanoutQ2()).to(setFanoutExchange());
	}

}

生产者

	// pub/sub 发布订阅模式   交换机类型 fanout
	@ApiOperation(value='fanout发送接口',notes='发送到fanoutExchange。消息将往该exchange下的所有queue转发')
	@GetMapping(value='/fanoutSend')
	public Object fanoutSend(String message) throws AmqpException, UnsupportedEncodingException {
		MessageProperties messageProperties = new MessageProperties();
		messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN);
		//fanout模式只往exchange里发送消息。分发到exchange下的所有queue
		rabbitTemplate.send('fanoutExchange', '', new Message(message.getBytes('UTF-8'),messageProperties));
		return 'message sended : '+message;
	}

消费者

//pub/sub模式进行消息监听
	@RabbitListener(queues='fanout.keliu')
	public void fanoutReceiveq1(String message) {

	    System.out.println('发布订阅模式1received message : ' +message);
	}
	@RabbitListener(queues='fanout.huozai')
	public void fanoutReceiveq2(String message) {
	    System.out.println('发布订阅模式2 received message : ' +message);
	}

代码运行结果
在这里插入图片描述

路由模式

路由模式就是可以依据routingKey选择性的投递消息
方式为:生产者->交换机->routingKey->队列->消费者,可以理解为生产者发送的消息先到交换机,交换机依据routingKey找到匹配的队列进行发送消息,然后消费者依据交换机及routingKey进行消费。个人理解其实相当于引入二级分类(一级为交换机、二级为routingKey),目的是为了更加精准或者让消息颗粒度更细的投递。
应用场景::AI智能系统通过AI摄像头实时监控景区的客流人数,采集现场图片进行分析是否有火灾,从而产生告警事件(客流事件、火灾事件),第三方管控系统订阅AI系统的客流告警事件,而物联系统对于客流告警事件及或者告警事件都需要订阅。如图所示
在这里插入图片描述

代码实例
配置

/*
   路由模式|Routing模式   交换机类型:direct
*/
@Configuration
public class DirectConfig {

	//声明队列
	@Bean
	public Queue directQ1() {
		return new Queue('direct.keliu');
	}
	@Bean
	public Queue directQ2() {
		return new Queue('direct.huozai');
	}


	//声明exchange
	@Bean
	public DirectExchange setDirectExchange() {
		return new DirectExchange('directExchange');
	}

	//声明binding,需要声明一个routingKey
	@Bean
	public Binding bindDirectBind1() {
		return BindingBuilder.bind(directQ1()).to(setDirectExchange()).with('guankong');
	}
	
	@Bean
	public Binding bindDirectBind2() {
			return BindingBuilder.bind(directQ1()).to(setDirectExchange()).with('wulian');
	}
	@Bean
	public Binding bindDirectBind3() {
		return BindingBuilder.bind(directQ2()).to(setDirectExchange()).with('wulian');
	}
}

生产者

//routing路由工作模式  交换机类型 direct
	@ApiOperation(value='direct发送接口',notes='发送到directExchange。exchange转发消息时,会往routingKey匹配的queue发送')
	@GetMapping(value='/directSend')
	public Object routingSend(String routingKey,String message) throws AmqpException, UnsupportedEncodingException {

		if(null == routingKey) {
			routingKey='guankong';
		}
		MessageProperties messageProperties = new MessageProperties();
		messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN);
		//direct模式只往exchange里发送消息。分发到exchange下所有绑定routingKey的queue
		rabbitTemplate.send('directExchange', routingKey, new Message(message.getBytes('UTF-8'),messageProperties));
		return 'message sended : routingKey >'+routingKey+';message > '+message;
	}

消费者

   //Routing路由模式
    @RabbitListener(queues='direct.keliu')
    public void routingReceiveq1(String message) {
	    System.out.println('Routing路由模式客流事件 received message : ' +message);
    }

    @RabbitListener(queues='direct.huozai')
    public void routingReceiveq2(String message) {
	    System.out.println('Routing路由模式火灾事件 received message : ' +message);
    }

运行结果
http://localhost:9001/directSend?message=%E6%B5%8B%E8%AF%95routing%E8%B7%AF%E7%94%B1%E5%B7%A5%E4%BD%9C%E6%A8%A1%E5%BC%8F&routingKey=wulian
在这里插入图片描述

主题模式

主题模式与路由模式类似,也是将消息路由到Routingkey与BindingKey匹配的队列中,但它不是完全匹配,而是模糊匹配。说白了就是路由模式是设置特定的routingKey绑定唯一的队列,而主题模式的是使用通配符匹配一个或者多个队列。
生产者->交换机->routingKey(模糊匹配)->队列->消费者
通配符有两种,*和#,

  • 表示可以匹配一个。
    #表示可以匹配多个
    应用场景:应用场景::AI智能系统通过AI摄像头实时监控景区的客流人数,采集现场图片进行分析是否有火灾,从而产生告警事件(客流事件、火灾事件),第三方管控系统订阅AI系统的客流告警事件,而物联系统对于客流告警事件及或者告警事件都需要订阅。如图
    在这里插入图片描述

代码实例
配置

/*
Topics模式  交换机类型 topic
* */
@Configuration
public class TopicConfig {

	//声明队列
	@Bean
	public Queue topicQ1() {
		return new Queue('topic.keliu');
	}
	@Bean
	public Queue topicQ2() {
		return new Queue('topic.huozai');
	}


	//声明exchange
	@Bean
	public TopicExchange setTopicExchange() {
		return new TopicExchange('topicExchange');
	}

	//声明binding,需要声明一个roytingKey
	@Bean
	public Binding bindTopicHebei1() {
		return BindingBuilder.bind(topicQ1()).to(setTopicExchange()).with('#.guankong.#');
	}
	@Bean
	public Binding bindTopicHebei2() {
		return BindingBuilder.bind(topicQ2()).to(setTopicExchange()).with('#.wulian.#');
	}

}

生产者

//topic 工作模式   交换机类型 topic
	@ApiOperation(value='topic发送接口',notes='发送到topicExchange。exchange转发消息时,会往routingKey匹配的queue发送,*代表一个单词,#代表0个或多个单词。')
	@GetMapping(value='/topicSend')
	public Object topicSend(String routingKey,String message) throws AmqpException, UnsupportedEncodingException {

		if(null == routingKey) {
			routingKey='guankong.wulian';
		}
		MessageProperties messageProperties = new MessageProperties();
		messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN);
		//fanout模式只往exchange里发送消息。分发到exchange下的所有queue
		rabbitTemplate.send('topicExchange', routingKey, new Message(message.getBytes('UTF-8'),messageProperties));
		return 'message sended : routingKey >'+routingKey+';message > '+message;
	}

消费者

 //topic 模式
	@RabbitListener(queues='topic.keliu')
	public void topicReceiveq1(String message) {
		System.out.println('Topic模式 客流事件 received message : ' +message);
	}

	@RabbitListener(queues='topic.huozai')
	public void topicReceiveq2(String message) {
		System.out.println('Topic模式 火灾事件 received  message : ' +message);
	}

运行结果

http://localhost:9001/topicSend?message=%E6%B5%8B%E8%AF%95%E4%B8%BB%E9%A2%98%E6%A8%A1%E5%BC%8F&routingKey=wulian.guankong
在这里插入图片描述

PRC模式

PRC工作模式: 通过消息队列实现PRC功能, 客户端发送消息到消费队列, 服务端进行消费消息执行程序将结果再发送到回调队列, 供客户端使用. 是一种双向生产消费模式。
1、 客户端既是生产者又是消费者,向RPC请求队列发送RPC调用消息,同时监听RPC响应队列
2、 服务端监听PRC请求队列的消息,收到消息后执行服务端的方法,得到方法返回的结果。
3、服务端将RPC返回的结果发送到RPC响应队列。

代码实例
配置

@Configuration
public class RpcConfig {

    @Bean
    public Queue rpcQueue() {
        return new Queue('rpcQueue');
    }
}

生产者

@GetMapping('/rpcSend')
	public void rpcSend() {
		Object receive = rabbitTemplate.convertSendAndReceive('rpcQueue','我是客户端 rpc send message');
		System.out.println('【发送端接收消息】' + receive);
	}

消费者

@RabbitListener(queues='rpcQueue')
	public String rpcListener(String message) {
		System.out.println('【rpc接收消息】' + message);
		return '我是消费端 rpc 返回' + message;
	}

运行结果
在这里插入图片描述

代码地址https:///zhang-le03100110/springboot-rabbitmq.git

    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多