分享

RabbitMQ集群

 路人甲Java 2022-12-14 发布于北京

RabbitMQ的集群

普通集群(副本集群)

官网介绍:

All data/state required for the operation of a RabbitMQ broker is replicated across all nodes. An exception to this are message queues, which by default reside on one node, though they are visible and reachable from all nodes. To replicate queues across nodes in a cluster, use a queue type that supports replication.

默认情况下:rabbitMQ代理操作所需的所有数据/状态都将跨所有节点复制.这方面的一个例外是消息队列,默认情况下,消息队列位于一个节点上,尽管他们可以从所有节点看到和访问

集群架构图:

image-20210217224724487

核心解决问题:当集群中某一时刻master节点宕机,可以对Queue中信息,进行备份

# 1 集群规划
	node1: 10.15.0.4 mq1 master节点
	node2: 10.15.0.3 mq2 repl1 副本节点
	node3: 10.15.0.5 mq3 repl1 副本节点

# 2 环境准备
- 切换到root
	 	su -
	
- 修改ip为		10.15.0.3/4/5
		vim /etc/sysconfig/network-scripts/ifcfg-ens33

- 修改主机名为  mq1/mq2/mq3
		vim /etc/hostname

- 三台机器上安装rabbitmq
		参考前文

- 克隆三台机器和ip映射
	vim /etc/hosts 加入:
		10.15.0.3 mq1
        10.15.0.4 mq2
        10.15.0.5 mq3

- 同步cookie文件
		scp /var/lib/rabbitmq/.erlang.cookie root@mq2:/var/lib/rabbitmq
		scp /var/lib/rabbitmq/.erlang.cookie root@mq3:/var/lib/rabbitmq

# 3 开启集群
- 以后台运行rabbitmq
		先关闭之前开启的rabbitmq服务  systemctl stop rabbitmq-server
		
		再以后台的方式开启rabbitmq服务  rabbitmq-server -detached

访问mq1

访问mq2

访问mq3

都可以访问成功。

- 在node2和node3执行加入集群命令
		1. 关闭  rabbitmqctl stop_app
		2. 加入集群  rabbitmqctl join_cluster rabbit@mq1
		3. 启动服务  rabbitmqctl start_app
		
- 查看集群状态,任意节点执行:
		rabbitmqctl cluster_status

如果出现这样的错误:

Clustering node rabbit@mq1 with rabbit@mq2 ... Error: mnesia_not_running

代表主节点没有运行

先把主节点启动一下

如果出现这样的错误:

Clustering node rabbit@mq1 with rabbit@mq2 ... Error: mnesia_unexpectedly_running

代表当前节点没有关闭

​ 先执行一下 rabbitmqctl stop_app,再执行 rabbitmqctl join_cluster rabbit@mq1

如果出现这样的提示代表集群创建成功

[root@mq1 ~]# rabbitmqctl cluster_status
Cluster status of node rabbit@mq1 ...
[{nodes,[{disc,[rabbit@mq1,rabbit@mq2,rabbit@mq3]}]},
 {running_nodes,[rabbit@mq2,rabbit@mq3,rabbit@mq1]},
 {cluster_name,<<"rabbit@mq3">>},
 {partitions,[]}]

通过we访问也能看到集群的三个节点

通过java代码测试一下:

  • 主节点mq2

  • 从节点mq1

  • 从节点mq3

虽然说从两个从节点都能看到主节点中的队列,但是当主节点宕机后,两个从节点并不能用作服务.

当我们关掉主节点mq2的服务后,mq1和mq3就看不到任何队列了

  • 主节点

  • 从节点mq3

  • 从节点mq1


镜像集群

官网介绍:

Quorum queues is an alternative, more modern queue type that offers high availability via replication and focuses on data safety.

镜像队列机制就是将队列在三个节点之间设置主从关系,消息会在三个节点之间进行自动同步,且如果其中一个节点不可用,并不会导致消息丢失或服务不可用的情况,提升MQ集群的整体高可用性

集群架构图:

image-20210217230836638

配置集群架构

# 0 策略说明
	rabbitmqctl set_policy [-p <vhostpath>] [--priority <priority>] [--apply-to <apply-to >]   <name> <pattern>  <definition>
	-p vhostpath: 可选参数,针对指定vhost下的queue进行设置
	Name: policy的名称
	Pattern: queue的匹配模式(正则表达式)
	Defintion: 镜像定义,包括三个部分ha-mode, ha-params, ha-sync-mode
		ha-mode: 指明镜像队列的模式,有效值为all/exactly/nodes
			all: 表示在集群中所有的节点上进行镜像
			exactly: 表示在指定个数的节点上进行镜像,节点的个数由ha-params指定
			nodes: 表示在指定的节点上进行镜像,节点名称通过ha-params指定
		ha-params: ha-mode模式需要用到的参数
		ha-sync-mode: 进行队列中消息的同步方式,有效值为automatic和manual
		priority: 可选参数,policy的优先级  数字越大优先级越高

# 1 查看当前策略
	rabbitmqctl list_policies

# 2 添加策略
        rabbitmqctl set_policy ha-all '^hello' '{"ha-mode":"all","ha-sync-mode":"automatic"}'
	说明:策略正则表达式为"^"表示所有匹配所有队列名称  ^hello:匹配hello开头队列

此时通过代码向主节点发送一条消息

三个节点都能收到消息,并且可以看到policy栏的策略也是我们刚刚设置的

当我们的主节点mq2宕机后

两个从节点中的队列依然存在

并且依然可以被消费者使用,此时的主节点是mq1

当我们再次启动主节点mq2后,mq2就成为了一个镜像节点,mq1依然是主节点

当日后不想使用这个策略了可以通过

# 3 删除策略
	rabbitmqctl clear_policy ha-all

来删除策略

安装配置HAProxy

# 1 下载haproxy的压缩包
	https://src./repo/pkgs/haproxy/

# 2 安装HAProxy
- 上传haproxy源码包

- 解压
		 tar -zxvf haproxy-2.1.10.tar.gz -C /usr/local

- 进入目录,进行编译安装
		cd /usr/local/haproxy-2.1.10/
		
		make TARGET=linux31 PREFIX=/usr/local/haproxy
		
		make install PREFIX=/usr/local/haproxy
		
		mkdir /etc/haproxy

- 赋权
		groupadd -r -g 149 haproxy
		
		useradd -g haproxy -r -s /sbin/nologin -u 149 haproxy

- 创建haproxy配置文件
		vim /etc/haproxy/haproxy.cfg
#haproxy的客户页面   
 # 下面这个是自己安装HAProxy的服务器的IP和端口号  
 bind 10.15.0.4:8888
 mode http
 option httplog
 stats uri /haproxy
 # 这个是配置登录名和密码  
 stats auth root:123456
 stats refresh 5s
 stats enable
 listen haproxy #负载均衡的名字  
 bind 0.0.0.0:5666 #对外提供的虚拟的端口  
 option tcplog
 mode tcp
 #轮询算法  
 balance roundrobin
 #下面这个是配置的负载均衡的后台的服务器和端口号  
 server mq1 10.15.0.3:5672 check inter 5000 rise 2 fall 2
 server mq2 10.15.0.4:5672 check inter 5000 rise 2 fall 2
 server mq3 10.15.0.5:5672 check inter 5000 rise 2 fall 2
# 3 启动HAProxy负载
	/usr/local/haproxy/sbin/haproxy  -f /etc/haproxy/haproxy.cfg

- 查看进程状态
		ps -ef |grep haproxy

- 访问web页面
		http://10.15.0.4:8888/haproxy

测试代码:

工具类

public class RabbitMQUtils {


    private static ConnectionFactory connectionFactory;

    //重量级资源放到静态代码块中给执行,这样执行的时候只会在程序中new一次
    static {
        //创建连接mq的工厂
        connectionFactory = new ConnectionFactory();
        //设置连接rabbitmq主机
        connectionFactory.setHost("10.15.0.4");
        //设置端口号
        connectionFactory.setPort(5666);  //改成HAProxy的配置文件里写的虚拟端口
        //设置连接哪个虚拟主机
        connectionFactory.setVirtualHost("/");
        //设置访问虚拟主机的用户名和密码
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
    }
    //定义提供连接对象的方法
    public static Connection getConnection() {
        try {
            //返回连接对象
            return  connectionFactory.newConnection();
        } catch (Exception e) {
            e.printStackTrace();
        }
        return null;
    }

    public static void closeConnAndChannel(Channel channel, Connection conn) {
        try {
            if (channel != null) channel.close();
            if (conn != null) conn.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

consumer测试

public class Consumer {

    public static void main(String[] args) throws IOException, TimeoutException  {

        Connection connection = RabbitMQUtils.getConnection();

        //创建通道
        Channel channel = connection.createChannel();

        //通道绑定对象
        channel.queueDeclare("hello",false,false,false,null);

        //消费消息
        /**
         * 参数1:消费哪个队列的消息 队列名称
         * 参数2:开始消息的自动确认机制
         * 参数3:消费消息时的回调接口
         */
        channel.basicConsume("hello",true,new DefaultConsumer(channel){
            /**
             *
             * @param consumerTag
             * @param envelope
             * @param properties
             * @param body  消息队列中取出的消息
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

                System.out.println("new String(body) = " + new String(body));
            }
        });

        //Consumer中要一直监听队列中的消息,因此不建议关闭
//        channel.close();
//        connection.close();
    }
}

provider测试

public class Provider {

    @Test
    public void sendMessage() throws IOException, TimeoutException {

        Connection connection = RabbitMQUtils.getConnection();

        //获取连接中的通道对象
        Channel channel = connection.createChannel();

        //通道绑定对应消息队列(通道中传的就是消息队列的消息)
        /**
         * 参数1:队列名称  如果队列不存在则自动创建
         * 参数2:是否持久化  如果为false 即不持久化 在重启rabbitmq的时候队列会消失
         *                  如果为true 即使队列持久化了  消息依然会消失
         *                      想要消息也持久化得在发布消息的参数3设置:
         *                          MessageProperties.PERSISTENT_TEXT_PLAIN
         * 参数3:是否独占队列
         * 参数4:是否在消费完成后自动删除队列
         * 参数5:附加参数
         */
        channel.queueDeclare("hello",false,false,false,null);

        //发布消息
        /**
         * 参数1:交换机名称
         * 参数2:队列名称
         * 参数3:传递消息额外设置
         * 参数4:消息的具体内容
         */
        channel.basicPublish("","hello", MessageProperties.PERSISTENT_TEXT_PLAIN,"helloWorld".getBytes());

        //关闭通道
//        channel.close();
//        connection.close();

        //通过工具类关闭通道
        RabbitMQUtils.closeConnAndChannel(channel,connection);


    }
}

能够正确收到消息

集群中的队列也能正确接收消息



    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多