RabbitMQ的集群
普通集群(副本集群)
官网介绍:
All data/state required for the operation of a RabbitMQ broker is replicated across all nodes. An exception to this are message queues, which by default reside on one node, though they are visible and reachable from all nodes. To replicate queues across nodes in a cluster, use a queue type that supports replication.
默认情况下:rabbitMQ代理操作所需的所有数据/状态都将跨所有节点复制.这方面的一个例外是消息队列,默认情况下,消息队列位于一个节点上,尽管他们可以从所有节点看到和访问
集群架构图:
核心解决问题:当集群中某一时刻master节点宕机,可以对Queue中信息,进行备份
# 1 集群规划
node1: 10.15.0.4 mq1 master节点
node2: 10.15.0.3 mq2 repl1 副本节点
node3: 10.15.0.5 mq3 repl1 副本节点
# 2 环境准备
- 切换到root
su -
- 修改ip为 10.15.0.3/4/5
vim /etc/sysconfig/network-scripts/ifcfg-ens33
- 修改主机名为 mq1/mq2/mq3
vim /etc/hostname
- 三台机器上安装rabbitmq
参考前文
- 克隆三台机器和ip映射
vim /etc/hosts 加入:
10.15.0.3 mq1
10.15.0.4 mq2
10.15.0.5 mq3
- 同步cookie文件
scp /var/lib/rabbitmq/.erlang.cookie root@mq2:/var/lib/rabbitmq
scp /var/lib/rabbitmq/.erlang.cookie root@mq3:/var/lib/rabbitmq
# 3 开启集群
- 以后台运行rabbitmq
先关闭之前开启的rabbitmq服务 systemctl stop rabbitmq-server
再以后台的方式开启rabbitmq服务 rabbitmq-server -detached
访问mq1
访问mq2
访问mq3
都可以访问成功。
- 在node2和node3执行加入集群命令
1. 关闭 rabbitmqctl stop_app
2. 加入集群 rabbitmqctl join_cluster rabbit@mq1
3. 启动服务 rabbitmqctl start_app
- 查看集群状态,任意节点执行:
rabbitmqctl cluster_status
如果出现这样的错误:
Clustering node rabbit@mq1 with rabbit@mq2 ... Error: mnesia_not_running
代表主节点没有运行
先把主节点启动一下
如果出现这样的错误:
Clustering node rabbit@mq1 with rabbit@mq2 ... Error: mnesia_unexpectedly_running
代表当前节点没有关闭
先执行一下 rabbitmqctl stop_app ,再执行 rabbitmqctl join_cluster rabbit@mq1
如果出现这样的提示代表集群创建成功
[root@mq1 ~]# rabbitmqctl cluster_status
Cluster status of node rabbit@mq1 ...
[{nodes,[{disc,[rabbit@mq1,rabbit@mq2,rabbit@mq3]}]},
{running_nodes,[rabbit@mq2,rabbit@mq3,rabbit@mq1]},
{cluster_name,<<"rabbit@mq3">>},
{partitions,[]}]
通过we访问也能看到集群的三个节点
通过java代码测试一下:
-
主节点mq2
-
从节点mq1
虽然说从两个从节点都能看到主节点中的队列,但是当主节点宕机后,两个从节点并不能用作服务.
当我们关掉主节点mq2的服务后,mq1和mq3就看不到任何队列了
镜像集群
官网介绍:
Quorum queues is an alternative, more modern queue type that offers high availability via replication and focuses on data safety.
镜像队列机制就是将队列在三个节点之间设置主从关系,消息会在三个节点之间进行自动同步,且如果其中一个节点不可用,并不会导致消息丢失或服务不可用的情况,提升MQ集群的整体高可用性
集群架构图:
配置集群架构
# 0 策略说明
rabbitmqctl set_policy [-p <vhostpath>] [--priority <priority>] [--apply-to <apply-to >] <name> <pattern> <definition>
-p vhostpath: 可选参数,针对指定vhost下的queue进行设置
Name: policy的名称
Pattern: queue的匹配模式(正则表达式)
Defintion: 镜像定义,包括三个部分ha-mode, ha-params, ha-sync-mode
ha-mode: 指明镜像队列的模式,有效值为all/exactly/nodes
all: 表示在集群中所有的节点上进行镜像
exactly: 表示在指定个数的节点上进行镜像,节点的个数由ha-params指定
nodes: 表示在指定的节点上进行镜像,节点名称通过ha-params指定
ha-params: ha-mode模式需要用到的参数
ha-sync-mode: 进行队列中消息的同步方式,有效值为automatic和manual
priority: 可选参数,policy的优先级 数字越大优先级越高
# 1 查看当前策略
rabbitmqctl list_policies
# 2 添加策略
rabbitmqctl set_policy ha-all '^hello' '{"ha-mode":"all","ha-sync-mode":"automatic"}'
说明:策略正则表达式为"^"表示所有匹配所有队列名称 ^hello:匹配hello开头队列
此时通过代码向主节点发送一条消息
三个节点都能收到消息,并且可以看到policy栏的策略也是我们刚刚设置的
当我们的主节点mq2宕机后
两个从节点中的队列依然存在
并且依然可以被消费者使用,此时的主节点是mq1
当我们再次启动主节点mq2后,mq2就成为了一个镜像节点,mq1依然是主节点
当日后不想使用这个策略了可以通过
# 3 删除策略
rabbitmqctl clear_policy ha-all
来删除策略
安装配置HAProxy
# 1 下载haproxy的压缩包
https://src./repo/pkgs/haproxy/
# 2 安装HAProxy
- 上传haproxy源码包
- 解压
tar -zxvf haproxy-2.1.10.tar.gz -C /usr/local
- 进入目录,进行编译安装
cd /usr/local/haproxy-2.1.10/
make TARGET=linux31 PREFIX=/usr/local/haproxy
make install PREFIX=/usr/local/haproxy
mkdir /etc/haproxy
- 赋权
groupadd -r -g 149 haproxy
useradd -g haproxy -r -s /sbin/nologin -u 149 haproxy
- 创建haproxy配置文件
vim /etc/haproxy/haproxy.cfg
#haproxy的客户页面
# 下面这个是自己安装HAProxy的服务器的IP和端口号
bind 10.15.0.4:8888
mode http
option httplog
stats uri /haproxy
# 这个是配置登录名和密码
stats auth root:123456
stats refresh 5s
stats enable
listen haproxy #负载均衡的名字
bind 0.0.0.0:5666 #对外提供的虚拟的端口
option tcplog
mode tcp
#轮询算法
balance roundrobin
#下面这个是配置的负载均衡的后台的服务器和端口号
server mq1 10.15.0.3:5672 check inter 5000 rise 2 fall 2
server mq2 10.15.0.4:5672 check inter 5000 rise 2 fall 2
server mq3 10.15.0.5:5672 check inter 5000 rise 2 fall 2
# 3 启动HAProxy负载
/usr/local/haproxy/sbin/haproxy -f /etc/haproxy/haproxy.cfg
- 查看进程状态
ps -ef |grep haproxy
- 访问web页面
http://10.15.0.4:8888/haproxy
测试代码:
工具类
public class RabbitMQUtils {
private static ConnectionFactory connectionFactory;
//重量级资源放到静态代码块中给执行,这样执行的时候只会在程序中new一次
static {
//创建连接mq的工厂
connectionFactory = new ConnectionFactory();
//设置连接rabbitmq主机
connectionFactory.setHost("10.15.0.4");
//设置端口号
connectionFactory.setPort(5666); //改成HAProxy的配置文件里写的虚拟端口
//设置连接哪个虚拟主机
connectionFactory.setVirtualHost("/");
//设置访问虚拟主机的用户名和密码
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
}
//定义提供连接对象的方法
public static Connection getConnection() {
try {
//返回连接对象
return connectionFactory.newConnection();
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
public static void closeConnAndChannel(Channel channel, Connection conn) {
try {
if (channel != null) channel.close();
if (conn != null) conn.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
consumer测试
public class Consumer {
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = RabbitMQUtils.getConnection();
//创建通道
Channel channel = connection.createChannel();
//通道绑定对象
channel.queueDeclare("hello",false,false,false,null);
//消费消息
/**
* 参数1:消费哪个队列的消息 队列名称
* 参数2:开始消息的自动确认机制
* 参数3:消费消息时的回调接口
*/
channel.basicConsume("hello",true,new DefaultConsumer(channel){
/**
*
* @param consumerTag
* @param envelope
* @param properties
* @param body 消息队列中取出的消息
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("new String(body) = " + new String(body));
}
});
//Consumer中要一直监听队列中的消息,因此不建议关闭
// channel.close();
// connection.close();
}
}
provider测试
public class Provider {
@Test
public void sendMessage() throws IOException, TimeoutException {
Connection connection = RabbitMQUtils.getConnection();
//获取连接中的通道对象
Channel channel = connection.createChannel();
//通道绑定对应消息队列(通道中传的就是消息队列的消息)
/**
* 参数1:队列名称 如果队列不存在则自动创建
* 参数2:是否持久化 如果为false 即不持久化 在重启rabbitmq的时候队列会消失
* 如果为true 即使队列持久化了 消息依然会消失
* 想要消息也持久化得在发布消息的参数3设置:
* MessageProperties.PERSISTENT_TEXT_PLAIN
* 参数3:是否独占队列
* 参数4:是否在消费完成后自动删除队列
* 参数5:附加参数
*/
channel.queueDeclare("hello",false,false,false,null);
//发布消息
/**
* 参数1:交换机名称
* 参数2:队列名称
* 参数3:传递消息额外设置
* 参数4:消息的具体内容
*/
channel.basicPublish("","hello", MessageProperties.PERSISTENT_TEXT_PLAIN,"helloWorld".getBytes());
//关闭通道
// channel.close();
// connection.close();
//通过工具类关闭通道
RabbitMQUtils.closeConnAndChannel(channel,connection);
}
}
能够正确收到消息
集群中的队列也能正确接收消息
|