经过测试和研究openfire源码,发现当在断网情况下,大概一分钟左右服务器才断定客户端断开连接。而在这一分钟内发送的消息是没有存入离线消息表里的。致使消息丢失。当然这个时间是可以设置的,但是设置时长没有实际价值。因为不能让服务器实时判断客户端是否连接。于是添加了消息队列机制,处理消息丢失的问题。
一下是具体实现:
一、在NIOConnection.java中的deliver方法中,添加消息队列。
所有的在线message都会经过这个 方法。
//聊天消息放入消息队列
if(packet instanceof Message) {
Message message = (Message)packet;
Type type = message.getType();
if("chat".equals(type.name())||"groupchat".equals(type.name())){
Element
request = message.getElement().element("request");
if(request!=null){
Message
createCopy = message.createCopy();
messageQueue.offer(createCopy);
//从队列中获取值发送
if(messageQueue.size()==1){
deliverMessage(createCopy);
}
return;
}
}
}
deliverMessage是一个发送message的方法,是根据发送的代码重构的,具体如下:
public void deliverMessage(Message queueMessage) throws
UnauthorizedException{
if (isClosed()) {
backupDeliverer.deliver(queueMessage);
}
else {
ByteBuffer buffer =
ByteBuffer.allocate(4096);
buffer.setAutoExpand(true);
boolean errorDelivering
= false;
try {
// OF-464: if the connection has been dropped,
fail over to backupDeliverer (offline)
if (!ioSession.isConnected()) {
throw new IOException("Connection reset/closed
by peer");
}
XMLWriter
xmlSerializer =
new XMLWriter(new ByteBufferWriter(buffer,
encoder.get()), new OutputFormat());
xmlSerializer.write(queueMessage.getElement());
xmlSerializer.flush();
if
(flashClient) {
buffer.put((byte) '\0');
}
buffer.flip();
ioSession.write(buffer);
}
catch (Exception e) {
Log.debug("Error delivering packet:\n" +
queueMessage, e);
errorDelivering = true;
}
if (errorDelivering) {
close();
// Retry
sending the packet again. Most probably if the packet is a
// Message
it will be stored offline
backupDeliverer.deliver(queueMessage);
}
else {
session.incrementServerPacketCount();
}
}
}
二、在一分钟左右服务器确定客户端断开连接时,会触发close()方法,在close方法中将消息队列中的消息放入离线消息表。
if (closedSuccessfully) {
//从消息队列中获取消息,插入到离线数据库
OfflineMessageStore instance =
OfflineMessageStore.getInstance();
Iterator iterator =
messageQueue.iterator();
while(iterator.hasNext()){
instance.addMessage(iterator.next());
}
messageQueue.clear();
notifyCloseListeners();
}
三、在接收消息回执时,会操作消息队列。这里实在ClientStanzeHandler.java中,processMessage()方法中获取connection,这里需要说明一下,之所以在这里操作,是因为消息队列是存储在接收方的connection中,而消息回执接收的connection是自己的connection。从这里发送回执message的时候要获取的connection为接收方的connection,进而才能操作对方的消息队列。
代码如下:
//判断是不是消息回执,如果是就获取消息队列进行操作
Element received =
packet.getElement().element("received");
if(received!=null){
if(connection!=null && connection
instanceof NIOConnection){
NIOConnection conn =
(NIOConnection)connection;
Queue messageQueue =
conn.getMessageQueue();
String attributeValue =
received.attributeValue("id");
Message peek =
messageQueue.peek();
//如果相等就从队列清除
if(attributeValue.equals(peek.getID())){
messageQueue.poll();
//判断队列中是否还有message,如果有的话继续发送
if(messageQueue.size()!=0){
peek =
messageQueue.peek();
conn.deliverMessage((peek));
}
}else{
//继续从队列中发送message
if(messageQueue.size()!=0){
peek =
messageQueue.peek();
conn.deliverMessage(peek);
}
}
}
}
至此,消息队列机制完成。经测试消息再也没有丢失。
|