MINA源码分析的札记1--Write流程
时间:2010-11-13 14:44来源:互联网 作者:互联网 点击:145次
从IoSession调用write的过程: IoSession.write(object message) 真正实现这个方法的是AbstractIoSession 1、创建writeFuture对象,用于异步操作的返回 2、将传入的Object对象,包装成WriteRequest对象,交给IoFilterChain去处理。 3、核心的实现就是这些代码: // Now, we can write the message. First, create a future WriteFuture writeFuture = new DefaultWriteFuture(this); WriteRequest writeRequest = new DefaultWriteRequest(message, writeFuture, remoteAddress); // Then, get the chain and inject the WriteRequest into it IoFilterChain filterChain = getFilterChain(); filterChain.fireFilterWrite(writeRequest); //..... return writeFuture; 下面看IoFilterChain是怎么处理的? 首先,IoFilterChain是用一个双向列表来保存它所包含的所有filter的,大概的结构如下: head <-> filter1 <-> filter2 <-> ..... <-> filterN <-> tail 其中head和tail是两个固有、内置、特殊的filter,主要是衔接和过渡的功能。如tail就是负责调用IoHandler的功能。 回到IoFilterChain怎么处理write的话题。 4、IoFilterChain从列表中找到tail,从tail开始查找filter,顺序调用每个filter的filterWrite()方法 典型实现: public void fireFilterWrite(WriteRequest writeRequest) { Entry tail = this.tail; callPreviousFilterWrite(tail, session, writeRequest); } private void callPreviousFilterWrite(Entry entry, IoSession session, WriteRequest writeRequest) { try { IoFilter filter = entry.getFilter(); NextFilter nextFilter = entry.getNextFilter(); filter.filterWrite(nextFilter, session, writeRequest); } catch (Throwable e) { writeRequest.getFuture().setException(e); fireExceptionCaught(e); } } 5、最后必然就掉到head这个filter的filterWrite,它的实现应该有些特殊,才能把消息发送给IoProcessor 将消息放入发送队列中,然后调用IoProcessor flush出去。 public void filterWrite(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception { AbstractIoSession s = (AbstractIoSession) session; s.getWriteRequestQueue().offer(s, writeRequest); if (!s.isWriteSuspended()) { s.getProcessor().flush(s); } } WriteRequestQueue的默认实现就是java.util.concurrent.ConcurrentLinkedQueue,舍去传入的session对象。? 在看processor怎么flush出去之前,先备忘下面的东西。 NOTE1:在这个filter传递过程中,IoSession一直被传递下去: filterWrite(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) NOTE2:其实head这个filter就只实现了两个方法,也就是它只关心这两个操作: private class HeadFilter extends IoFilterAdapter { @SuppressWarnings("unchecked") @Override public void filterWrite(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception { } @SuppressWarnings("unchecked") @Override public void filterClose(NextFilter nextFilter, IoSession session) throws Exception { } } 6、IoProcessor是怎么处理的? IoProcessor的flush方法在AbstractPollingIoProcessor类中实现。 它将传入的session对象加入到flushingSessions列表, 然后调用processor内在的selector.wakeUp (java.nio.channels.Selector), 而在AbstractPollingIoProcessor类中独立运行的Processor线程,就回从select()等待中被唤醒。 7、AbstractPollingIoProcessor.Processor线程 被唤醒之后,从flushingSessions中取出session对象,再从session中取出WriteRequest对象 如果WriteRequest对象是IoBuffer类型的,则: if (buf.remaining() <= length) { return session.getChannel().write(buf.buf()); } 如果WriteRequest对象是File对象,则: region.getFileChannel().transferTo( region.getPosition(), length, session.getChannel()); region是FileRegion对象。就是将文件内容传输到session的Channel上去。 【这部分省略了很多细节,具体了解细节的话,主要看AbstractPollingIoProcessor类和NioProcessor类】 8、自此,才将数据真正发送到网络上去。 |
|