分享

Mina编程的两个注意点

 hehffyy 2011-11-30
  • 1. 首先,这是一个nio的框架,仍然是采用reactor模式,知道这一点后,那么编程就没有什么难的。nio的编程,无外乎就是这些套路, 再进一步说,网络编程,也就是这些套路了。
  • 2. 那么剩下编程的注意点,也就是编解码的处理以及最后的业务逻辑的处理。



2.1 编解码的注意点:因为在网络编程中,client和server之间,往往需要完整的接收到一条消息的后,才交给业务逻辑处理。具体可以参看http://jimmee./blog/617544,其中,我们常常是继承自CumulativeProtocolDecoder来实现自己的解码器,主要的docode的方法,其作用是将本次数据和上次接收到的数据(如果doDecode方法没有处理完的话),统一放到一个buffer中,之后扔给 doDecode方法处理,若处理完之后,还有剩下的数据,则继续缓存。 

Java代码  收藏代码
  1. /** 
  2.     * Cumulates content of <tt>in</tt> into internal buffer and forwards 
  3.     * decoding request to {@link #doDecode(IoSession, IoBuffer, ProtocolDecoderOutput)}. 
  4.     * <tt>doDecode()</tt> is invoked repeatedly until it returns <tt>false</tt> 
  5.     * and the cumulative buffer is compacted after decoding ends. 
  6.     * 
  7.     * @throws IllegalStateException if your <tt>doDecode()</tt> returned 
  8.     *                               <tt>true</tt> not consuming the cumulative buffer. 
  9.     */  
  10.    public void decode(IoSession session, IoBuffer in,  
  11.            ProtocolDecoderOutput out) throws Exception {  
  12.        if (!session.getTransportMetadata().hasFragmentation()) {  
  13.            while (in.hasRemaining()) {  
  14.                if (!doDecode(session, in, out)) {  
  15.                    break;  
  16.                }  
  17.            }  
  18.   
  19.            return;  
  20.        }  
  21.   
  22.        boolean usingSessionBuffer = true;  
  23.        IoBuffer buf = (IoBuffer) session.getAttribute(BUFFER);  
  24.        // If we have a session buffer, append data to that; otherwise  
  25.        // use the buffer read from the network directly.  
  26.        if (buf != null) {  
  27.            boolean appended = false;  
  28.            // Make sure that the buffer is auto-expanded.  
  29.            if (buf.isAutoExpand()) {  
  30.                try {  
  31.                    buf.put(in);  
  32.                    appended = true;  
  33.                } catch (IllegalStateException e) {  
  34.                    // A user called derivation method (e.g. slice()),  
  35.                    // which disables auto-expansion of the parent buffer.  
  36.                } catch (IndexOutOfBoundsException e) {  
  37.                    // A user disabled auto-expansion.  
  38.                }  
  39.            }  
  40.   
  41.            if (appended) {  
  42.                buf.flip();  
  43.            } else {  
  44.                // Reallocate the buffer if append operation failed due to  
  45.                // derivation or disabled auto-expansion.  
  46.                buf.flip();  
  47.                IoBuffer newBuf = IoBuffer.allocate(  
  48.                        buf.remaining() + in.remaining()).setAutoExpand(true);  
  49.                newBuf.order(buf.order());  
  50.                newBuf.put(buf);  
  51.                newBuf.put(in);  
  52.                newBuf.flip();  
  53.                buf = newBuf;  
  54.   
  55.                // Update the session attribute.  
  56.                session.setAttribute(BUFFER, buf);  
  57.            }  
  58.        } else {  
  59.            buf = in;  
  60.            usingSessionBuffer = false;  
  61.        }  
  62.   
  63.   // 上面操作完后,得到的buf是包含以前积累的数据(如果没有读取处理掉),再加上本次得到  
  64.        // 的数据,最后扔给doDecode处理。  
  65.        for (;;) {  
  66.            int oldPos = buf.position();  
  67.            boolean decoded = doDecode(session, buf, out);  
  68.            if (decoded) {  
  69.                if (buf.position() == oldPos) {  
  70.                    throw new IllegalStateException(  
  71.                            "doDecode() can't return true when buffer is not consumed.");  
  72.                }  
  73.   
  74.                if (!buf.hasRemaining()) {  
  75.                    break;  
  76.                }  
  77.            } else {  
  78.                break;  
  79.            }  
  80.        }  
  81.       
  82.        // if there is any data left that cannot be decoded, we store  
  83.        // it in a buffer in the session and next time this decoder is  
  84.        // invoked the session buffer gets appended to  
  85.        if (buf.hasRemaining()) {  
  86.            if (usingSessionBuffer && buf.isAutoExpand()) {  
  87.                buf.compact();  
  88.            } else {  
  89.                storeRemainingInSession(buf, session);  
  90.            }  
  91.        } else {  
  92.            if (usingSessionBuffer) {  
  93.                removeSessionBuffer(session);  
  94.            }  
  95.        }  
  96.    }  


可以看一个具体的实现:PrefixedStringDecoder的 doDecode方法,这个decode是消息长度+具体字节流的消息格式。 

Java代码  收藏代码
  1. protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception {  
  2.        if (in.prefixedDataAvailable(prefixLength, maxDataLength)) {  
  3.            String msg = in.getPrefixedString(prefixLength, charset.newDecoder());  
  4.            out.write(msg);  
  5.            return true;  
  6.        }  
  7.   
  8.        return false;  
  9.    }  


其中, prefixedDataAvailable方法是判断得到IoBuffer里的数据是否满足一条消息了,如果再进去看这个方法的实现化,读取是,使用ByteBuffer的绝对位置的读取方法(这种读取不影响position的值的);当已经有一条完整的消息时,则用getPrefixedString读取(使用的是ByteBuffer的相对位置的读取方法,这会影响position的值,从而实际的消费掉数据). 

2.2 业务逻辑的处理注意点,一般都会使用一个线程池处理。这里就有一个问题,有时候同一个连接的消息处理,是希望按照顺序来进行的。这也很简单,不需要保证一个连接的所有的业务处理都限定在一个固定的线程中,但是需要保证当有消息需要处理时,这些消息的处理,都在同一个线程中完成。当然了,mina中也有了相应的实现OrderedThreadPoolExecutor。实现原理很简单: 
一个连接(session)对应一个消息队列,同时此session也放到一个队列中,说明这个session有消息需要处理。具体来说,就是使用SessionTasksQueue来表示一个Session的要处理的消息的队列; 
使用 
Java代码  收藏代码
  1. /** A queue used to store the available sessions */  
  2.   private final BlockingQueue<IoSession> waitingSessions = new LinkedBlockingQueue<IoSession>();  

waitingSessions表示有消息需要处理的session的队列。 
Java代码  收藏代码
  1. /** 
  2.      * {@inheritDoc} 
  3.      */  
  4.     @Override  
  5.     public void execute(Runnable task) {  
  6.         if (shutdown) {  
  7.             rejectTask(task);  
  8.         }  
  9.   
  10.         // Check that it's a IoEvent task  
  11.         checkTaskType(task);  
  12.   
  13.         IoEvent event = (IoEvent) task;  
  14.           
  15.         // Get the associated session  
  16.         IoSession session = event.getSession();  
  17.           
  18.         // 得到保存session消息的队列  
  19.         SessionTasksQueue sessionTasksQueue = getSessionTasksQueue(session);  
  20.         Queue<Runnable> tasksQueue = sessionTasksQueue.tasksQueue;  
  21.           
  22.         boolean offerSession;  
  23.   
  24.         // propose the new event to the event queue handler. If we  
  25.         // use a throttle queue handler, the message may be rejected  
  26.         // if the maximum size has been reached.  
  27.         boolean offerEvent = eventQueueHandler.accept(this, event);  
  28.           
  29.         if (offerEvent) {  
  30.             // Ok, the message has been accepted  
  31.             synchronized (tasksQueue) {  
  32.                 // Inject the event into the executor taskQueue  
  33.                 tasksQueue.offer(event);  
  34.                   
  35.              // 如果session中的消息已经处理完了,说明没有线程在处理这个session,  
  36.                  // 重新提交给线程池处理  
  37.                 if (sessionTasksQueue.processingCompleted) {  
  38.                     sessionTasksQueue.processingCompleted = false;  
  39.                     offerSession = true;  
  40.                 } else {  
  41.                     offerSession = false;  
  42.                 }  
  43.   
  44.                 if (LOGGER.isDebugEnabled()) {  
  45.                     print(tasksQueue, event);  
  46.                 }  
  47.             }  
  48.         } else {  
  49.             offerSession = false;  
  50.         }  
  51.   
  52.         if (offerSession) {  
  53.             // As the tasksQueue was empty, the task has been executed  
  54.             // immediately, so we can move the session to the queue  
  55.             // of sessions waiting for completion.  
  56.             waitingSessions.offer(session);  
  57.         }  
  58.   
  59.         addWorkerIfNecessary();  
  60.   
  61.         if (offerEvent) {  
  62.             eventQueueHandler.offered(this, event);  
  63.         }  


对应的,看一下: 

Java代码  收藏代码
  1. private void runTasks(SessionTasksQueue sessionTasksQueue) {  
  2.            for (;;) {  
  3.                Runnable task;  
  4.                Queue<Runnable> tasksQueue = sessionTasksQueue.tasksQueue;  
  5.                  
  6.                synchronized (tasksQueue) {  
  7.                    task = tasksQueue.poll();  
  8.                    // 当已经没有任务时,处理此session的线程结束掉  
  9.                    if (task == null) {  
  10.                        sessionTasksQueue.processingCompleted = true;  
  11.                        break;  
  12.                    }  
  13.                }  
  14.   
  15.                eventQueueHandler.polled(OrderedThreadPoolExecutor.this, (IoEvent) task);  
  16.   
  17.                runTask(task);  
  18.            }  
  19.        }  


小结:个人认为,只要对nio编程熟悉,对mina框架,只要明白了以上两点,就不再有什么大问题了。 

    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多