1、什么是JMS?
JMS中文名叫Java消息服务,它是一种规范,是javaEE的13种核心规范之一。关于javaEE的13种核心规范,网上一搜一大堆,这里不再赘述。JMS就是天上飞的理念,而各种MQ就是这种理念的落地实现。比如activeMQ、rocketMQ等,都要遵循JMS这个规范。
2、JMS的结构和特点:
TextMessage textMessage = new session.createTextMessage("这是一条TextMessage");
// TextMessage 类型设置消息属性
textMessage.setStringProperty("property", "VIP");
在消费者中取出消息后:
textMessage.getStringProperty("property")
即可取出消息属性。
注意上面JMS结构的层级关系。
3、如何保证消息的可靠性?(面试重点)
一般要从三个角度去回答(持久性、事务、签收)。
- 持久性:持久,是MQ挂了,消息依然存在,非持久,就是MQ挂了,消息就没了。
队列生产者的持久性:
// 这个producer是队列
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); // 非持久
producer.setDeliveryMode(DeliveryMode.PERSISTENT); // 持久
队列设置为非持久,如果生产者将消息发送到MQ后,MQ挂了,那么这些消息就没了,即使MQ恢复正常也没了。队列设置为持久,那么消息只要还没消费就还会有。activeMQ的队列默认设置了持久,可保证消息只被传送一次和成功使用一次。
主题的持久性:
主题要设置持久,生产者和消费者的编码方式与之前都有点儿不一样,代码如下:
public class Consumer {
private static final String URL = "tcp://192.168.x.xxx:61616";
private static final String TOPIC_NAME = "topic_test";
public static void main(String[] args) throws Exception {
// 1. 创建factory工厂
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(URL);
// 2. 创建connection连接
Connection connection = factory.createConnection();
connection.setClientID("张三");
System.out.println("张三订阅");
// 3. 创建session
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 4. 订阅topic
Topic topic = session.createTopic(TOPIC_NAME);
TopicSubscriber subscriber = session.createDurableSubscriber(topic,"备注信息");
// 5. 启动
connection.start();
// 6. 消费topic的消息
Message message = subscriber.receive();
while (null != message){
TextMessage textMessage = (TextMessage) message;
System.out.println("收到消息:" + textMessage.getText());
message = subscriber.receive(5000L);
}
// 6. 关闭资源(顺着申请,倒着关闭)
session.close();
connection.close();
System.out.println("5秒还没消息来,我溜了!");
}
}
public class Productor {
private static final String URL = "tcp://192.168.0.103:61616";
private static final String TOPIC_NAME = "topic_test";
public static void main(String[] args) throws Exception {
// 1. 创建factory工厂
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(URL);
// 2. 创建connection连接
Connection connection = factory.createConnection();
// 3. 创建session
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 4. 创建目的地topic
Topic topic = session.createTopic(TOPIC_NAME);
MessageProducer producer = session.createProducer(topic);
// 设置持久性
//producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); // 非持久
producer.setDeliveryMode(DeliveryMode.PERSISTENT); // 持久
connection.start();
// 5. 生产消息
for (int i = 1; i <= 3; i++) {
TextMessage message = session.createTextMessage("queue" + i);
// 6. 将消息发送到MQ
producer.send(message);
}
// 7. 关闭资源(顺着申请,倒着关闭)
producer.close();
session.close();
connection.close();
System.out.println("发送到MQ完成!");
}
}
主题设置了持久的话,一定要先运行一次消费者,等于向MQ注册,表示我订阅了这个主题。然后再运行生产者发送信息,此时,不论消费者是否还在线,都会接收到消息,不在线的话,下次连接的时候,会把没有收过的消息都接收下来。
- 事务:创建session的时候要传两个参数,一个是事务,一个是签收。
生产者事务:
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
第一个参数就是表示事务,设置为false,表示只要执行了send方法,消息就进入到队列中了;如果设置为true,需要send后再执行commit,消息才会被提交到队列中。所以在session提交前,需要调commit方法,如下:
try{
//没问题就提交事务
session.commit();
}catch(Exception e){
//有问题就回滚
session.rollback();
}finally{
producer.close();
session.close();
}
生产者主事务,不管签收,因为消费者才需要签收嘛。生产者设置了事务,签收机制就无所谓了,只是这个方法需要传一个签收机制,其实事务设置为true后,起作用的就是事务了。
消费者事务:
如果消费者开启了事务,进行消费时而没有commit的话,MQ会认为你还没有成功消费消息,就会出现重复消费的情况,所以消费者一般不开启事务,而是以签收机制为主。
签收:签收机制有四种,用得较多的是自动和手动两种方式。
消费者非事务的手动签收:
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
如果这个时候直接运行消费者,发现又可以重复消费消息,因为MQ不知道你已经签收消息了。所以在receive到消息后,应该手动签收,才不会重复消费,如下:
while (null != message){
TextMessage textMessage = (TextMessage) message;
textMessage.acknowledge(); // 手动签收
System.out.println("收到消息:" + textMessage.getText());
message = subscriber.receive(5000L);
}
消费者开启事务的情况下的签收:
Session session = connection.createSession(true, Session.CLIENT_ACKNOWLEDGE);
开启了事务,就会自动设置为自动签收,即使后面那个参数设置了手动签收,也不起作用了。所以,不需要调用acknowledge()方法进行签收。如果开启了事务,设置了手动签收,调用了acknowledge()方法,但是没有commit,还是会重复消费。
总之,在事务会话中,当一个事务被成功提交则消息被自动签收,如果事务回滚,则消息会被再次传递。非事务会话中,消息何时被确认取决于创建会话时的签收模式。
小结:不能容忍丢失消息,就用持久订阅,可以容忍丢失消息,就用非持久订阅