分享

在openfire中添加消息队列,防止聊天信息丢失方法

 WindySky 2016-02-18
    经过测试和研究openfire源码,发现当在断网情况下,大概一分钟左右服务器才断定客户端断开连接。而在这一分钟内发送的消息是没有存入离线消息表里的。致使消息丢失。当然这个时间是可以设置的,但是设置时长没有实际价值。因为不能让服务器实时判断客户端是否连接。于是添加了消息队列机制,处理消息丢失的问题。
一下是具体实现:
一、在NIOConnection.java中的deliver方法中,添加消息队列。
所有的在线message都会经过这个 方法。

        //聊天消息放入消息队列
        if(packet instanceof Message) {
        Message message = (Message)packet;
        Type type = message.getType();
        if("chat".equals(type.name())||"groupchat".equals(type.name())){
            Element request = message.getElement().element("request");
            if(request!=null){
            Message createCopy = message.createCopy();
            messageQueue.offer(createCopy);
            //从队列中获取值发送
                        if(messageQueue.size()==1){
                        deliverMessage(createCopy);
                        }
                        return;
            }
            }
        }
deliverMessage是一个发送message的方法,是根据发送的代码重构的,具体如下:
public void deliverMessage(Message queueMessage) throws UnauthorizedException{
    if (isClosed()) {
            backupDeliverer.deliver(queueMessage);
        }
        else {
    ByteBuffer buffer = ByteBuffer.allocate(4096);
       buffer.setAutoExpand(true);
    boolean errorDelivering = false;
       try {
        // OF-464: if the connection has been dropped, fail over to backupDeliverer (offline)
        if (!ioSession.isConnected()) {
        throw new IOException("Connection reset/closed by peer");
        }
           XMLWriter xmlSerializer =
                   new XMLWriter(new ByteBufferWriter(buffer, encoder.get()), new OutputFormat());
           xmlSerializer.write(queueMessage.getElement());
           xmlSerializer.flush();
           if (flashClient) {
               buffer.put((byte) '\0');
           }
           buffer.flip();
           ioSession.write(buffer);
       }
       catch (Exception e) {
           Log.debug("Error delivering packet:\n" + queueMessage, e);
           errorDelivering = true;
       }
       if (errorDelivering) {
           close();
           // Retry sending the packet again. Most probably if the packet is a
           // Message it will be stored offline
           backupDeliverer.deliver(queueMessage);
       }
       else {
           session.incrementServerPacketCount();
       }
        }
    }
二、在一分钟左右服务器确定客户端断开连接时,会触发close()方法,在close方法中将消息队列中的消息放入离线消息表。
 if (closedSuccessfully) {
        //从消息队列中获取消息,插入到离线数据库
        OfflineMessageStore instance = OfflineMessageStore.getInstance();
        Iterator iterator = messageQueue.iterator();
        while(iterator.hasNext()){
        instance.addMessage(iterator.next());
        }
        messageQueue.clear();
            notifyCloseListeners();
        }
三、在接收消息回执时,会操作消息队列。这里实在ClientStanzeHandler.java中,processMessage()方法中获取connection,这里需要说明一下,之所以在这里操作,是因为消息队列是存储在接收方的connection中,而消息回执接收的connection是自己的connection。从这里发送回执message的时候要获取的connection为接收方的connection,进而才能操作对方的消息队列。
代码如下:
      //判断是不是消息回执,如果是就获取消息队列进行操作
        Element received = packet.getElement().element("received");
        if(received!=null){
       if(connection!=null && connection instanceof NIOConnection){
        NIOConnection conn = (NIOConnection)connection;
        Queue messageQueue = conn.getMessageQueue();
        String attributeValue = received.attributeValue("id");
    Message peek = messageQueue.peek();
    //如果相等就从队列清除
    if(attributeValue.equals(peek.getID())){
    messageQueue.poll();
    //判断队列中是否还有message,如果有的话继续发送
    if(messageQueue.size()!=0){
    peek = messageQueue.peek();
    conn.deliverMessage((peek));
    }
    }else{
    //继续从队列中发送message
    if(messageQueue.size()!=0){
    peek = messageQueue.peek();
    conn.deliverMessage(peek);
    }
    }
       }
        }
至此,消息队列机制完成。经测试消息再也没有丢失。
内部还有一些细节,通过代码可以理解不在赘述。

    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多