分享

netty4源码分析

 知识存储馆 2014-06-29

本文为原创,转载请注明出处

netty4源码分析-accept

 

        本文分析服务端如何accept客户端的connect请求,首先看下selector的I/O多路复用的分发逻辑:

Java代码  收藏代码
  1. //NioEventLoop  
  2. private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {  
  3.         final NioUnsafe unsafe = ch.unsafe();  
  4.         if (!k.isValid()) {  
  5.             // close the channel if the key is not valid anymore  
  6.             unsafe.close(unsafe.voidPromise());  
  7.             return;  
  8.         }  
  9.         try {  
  10.             int readyOps = k.readyOps();  
  11.             if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {  
  12.                 unsafe.read();  
  13.                 if (!ch.isOpen()) {  
  14.                     // Connection already closed - no need to handle write.  
  15.                     return;  
  16.                 }  
  17.             }  
  18.             if ((readyOps & SelectionKey.OP_WRITE) != 0) {  
  19.                 // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write  
  20.                 ch.unsafe().forceFlush();  
  21.             }  
  22.             if ((readyOps & SelectionKey.OP_CONNECT) != 0) {  
  23.                 // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking  
  24.                 // See https://github.com/netty/netty/issues/924  
  25.                 int ops = k.interestOps();  
  26.                 ops &= ~SelectionKey.OP_CONNECT;  
  27.                 k.interestOps(ops);  
  28.   
  29.                 unsafe.finishConnect();  
  30.             }  
  31.         } catch (CancelledKeyException e) {  
  32.             unsafe.close(unsafe.voidPromise());  
  33.         }  
  34.     }  

         当有OP_ACCEPT事件到达时,分发给NioMessageUnsafe的read方法进行处理。

Java代码  收藏代码
  1. //NioMessageUnsafe  
  2. public void read() {  
  3.             assert eventLoop().inEventLoop();  
  4.             final SelectionKey key = selectionKey();  
  5.             if (!config().isAutoRead()) {  
  6.                 int interestOps = key.interestOps();  
  7.                 if ((interestOps & readInterestOp) != 0) {  
  8.                     // only remove readInterestOp if needed  
  9.                     key.interestOps(interestOps & ~readInterestOp);  
  10.                 }  
  11.             }  
  12.             final ChannelConfig config = config();  
  13.             final int maxMessagesPerRead = config.getMaxMessagesPerRead();  
  14.             final boolean autoRead = config.isAutoRead();  
  15.             final ChannelPipeline pipeline = pipeline();  
  16.             boolean closed = false;  
  17.             Throwable exception = null;  
  18.             try {  
  19.                 for (;;) {  
  20.                     int localRead = doReadMessages(readBuf);  
  21.                     if (localRead == 0) {  
  22.                         break;  
  23.                     }  
  24.                     if (localRead < 0) {  
  25.                         closed = true;  
  26.                         break;  
  27.                     }  
  28.                     if (readBuf.size() >= maxMessagesPerRead | !autoRead) {  
  29.                         break;  
  30.                     }  
  31.                 }  
  32.             } catch (Throwable t) {  
  33.                 exception = t;  
  34.             }  
  35.             int size = readBuf.size();  
  36.             for (int i = 0; i < size; i ++) {  
  37.                 pipeline.fireChannelRead(readBuf.get(i));  
  38.             }  
  39.             readBuf.clear();  
  40.             pipeline.fireChannelReadComplete();  
  41.             if (exception != null) {  
  42.                 if (exception instanceof IOException) {  
  43.                     // ServerChannel should not be closed even on IOException because it can often continue  
  44.                     // accepting incoming connections. (e.g. too many open files)  
  45.                     closed = !(AbstractNioMessageChannel.this instanceof ServerChannel);  
  46.                 }  
  47.   
  48.                 pipeline.fireExceptionCaught(exception);  
  49.             }  
  50.   
  51.             if (closed) {  
  52.                 if (isOpen()) {  
  53.                     close(voidPromise());  
  54.                 }  
  55.             }  
  56.         }  
  57.     }  

 其中doReadMessages方法由NioServerSocketChannel实现:

Java代码  收藏代码
  1. // NioServerSocketChannel  
  2.  protected int doReadMessages(List<Object> buf) throws Exception {  
  3.         SocketChannel ch = javaChannel().accept();  
  4.   
  5.         try {  
  6.             if (ch != null) {  
  7.                 buf.add(new NioSocketChannel(this, ch));  
  8.                 return 1;  
  9.             }  
  10.         } catch (Throwable t) {  
  11.             logger.warn("Failed to create a new channel from an accepted socket.", t);  
  12.   
  13.             try {  
  14.                 ch.close();  
  15.             } catch (Throwable t2) {  
  16.                 logger.warn("Failed to close a socket.", t2);  
  17.             }  
  18.         }  
  19.   
  20.         return 0;  
  21.     }  

        SocketChannel ch = javaChannel().accept()就为接受的客户端连接建立了一个已连接套接字socketChannel.

buf.add(new NioSocketChannel(this, ch))会构造一个NioSocketChannel,并将其缓存到buf中(buf是一个List<Object>)。该NioSocketChannel的模式为非阻塞,readInterestOp为SelectionKey.OP_READ,并创建对应的管道和NioByteUnsafe实例。

maxMessagesPerRead表示如果此时有多个connect,那么只有当SeverSocketChannel建立的已连接套接字个数超过maxMessagesPerRead后,才会对每个已连接套接字触发channelRead事件。maxMessagesPerRead的默认值是16. 

接下来分析channelRead事件做了什么事情: 

channelRead是Inbound事件,会调用ServerBootstrapAcceptor的channelRead方法:

Java代码  收藏代码
  1. // ServerBootstrapAcceptor  
  2.   public void channelRead(ChannelHandlerContext ctx, Object msg) {  
  3.             Channel child = (Channel) msg;  
  4.   
  5.             child.pipeline().addLast(childHandler);  
  6.   
  7.             for (Entry<ChannelOption<?>, Object> e: childOptions) {  
  8.                 try {  
  9.                     if (!child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {  
  10.                         logger.warn("Unknown channel option: " + e);  
  11.                     }  
  12.                 } catch (Throwable t) {  
  13.                     logger.warn("Failed to set a channel option: " + child, t);  
  14.                 }  
  15.             }  
  16.   
  17.             for (Entry<AttributeKey<?>, Object> e: childAttrs) {  
  18.                 child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());  
  19.             }  
  20.   
  21.             try {  
  22.                 childGroup.register(child);  
  23.             } catch (Throwable t) {  
  24.                 child.unsafe().closeForcibly();  
  25.                 logger.warn("Failed to register an accepted channel: " + child, t);  
  26.             }  
  27.         }  

 首先child.pipeline().addLast(childHandler)将服务端main函数中实例化的ChannelInitializer加入到管道中,该处理器的initChannel方法会在channelRegistered事件触发时被调用

Java代码  收藏代码
  1. childHandler(new ChannelInitializer<SocketChannel>() {  
  2.                  @Override  
  3.                  public void initChannel(SocketChannel ch) throws Exception {  
  4.                      ch.pipeline().addLast(  
  5.                              //new LoggingHandler(LogLevel.INFO),  
  6.                              new EchoServerHandler());  
  7.                  }  
  8.              });  

 然后设置NioSocketchannel的一些属性,最后进行注册:childGroup.register(child)。 

这里采用的是childGroup,即worker线程池所在的Group,从Group中选择一个NioEventLoop,并启动其持有的worker线程,执行register0任务。

Java代码  收藏代码
  1. // AbstractUnsafe  
  2.   public final void register(EventLoop eventLoop, final ChannelPromise promise) {  
  3.             if (eventLoop == null) {  
  4.                 throw new NullPointerException("eventLoop");  
  5.             }  
  6.             if (isRegistered()) {  
  7.                 promise.setFailure(new IllegalStateException("registered to an event loop already"));  
  8.                 return;  
  9.             }  
  10.             if (!isCompatible(eventLoop)) {  
  11.                 promise.setFailure(  
  12.                         new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));  
  13.                 return;  
  14.             }  
  15.   
  16.             AbstractChannel.this.eventLoop = eventLoop;  
  17.   
  18.             if (eventLoop.inEventLoop()) {  
  19.                 register0(promise);  
  20.             } else {  
  21.                 try {  
  22.                     eventLoop.execute(new Runnable() {  
  23.                         @Override  
  24.                         public void run() {  
  25.                             register0(promise);  
  26.                         }  
  27.                     });  
  28.                 } catch (Throwable t) {  
  29.                     logger.warn(  
  30.                             "Force-closing a channel whose registration task was not accepted by an event loop: {}",  
  31.                             AbstractChannel.this, t);  
  32.                     closeForcibly();  
  33.                     closeFuture.setClosed();  
  34.                     promise.setFailure(t);  
  35.                 }  
  36.             }  
  37.         }  
  38.   
  39.         private void register0(ChannelPromise promise) {  
  40.             try {  
  41.                 // check if the channel is still open as it could be closed in the mean time when the register  
  42.                 // call was outside of the eventLoop  
  43.                 if (!ensureOpen(promise)) {  
  44.                     return;  
  45.                 }  
  46.                 doRegister();  
  47.                 registered = true;  
  48.                 promise.setSuccess();  
  49.                 pipeline.fireChannelRegistered();  
  50.                 if (isActive()) {  
  51.                     pipeline.fireChannelActive();  
  52.                 }  
  53.             } catch (Throwable t) {  
  54.                 // Close the channel directly to avoid FD leak.  
  55.                 closeForcibly();  
  56.                 closeFuture.setClosed();  
  57.                 if (!promise.tryFailure(t)) {  
  58.                     logger.warn(  
  59.                             "Tried to fail the registration promise, but it is complete already. " +  
  60.                                     "Swallowing the cause of the registration failure:", t);  
  61.                 }  
  62.             }  
  63.         }  

 此时worker线程就启动了。Register0任务在connect文章中已经描述,其主要功能就是将socketchannel注册到selector中;然后触发channelRegistered事件,调用ChannelInitializer的initChannel方法将服务端main函数中设置的处理器(本例为EchoServerHandler)加入到管道中,并将自己ChannelInitializer从管道中移除;最后触发channelActive事件,将ops设置为read。

Java代码  收藏代码
  1. // DefaultChannelPipeline  
  2.   public ChannelPipeline fireChannelActive() {  
  3.         head.fireChannelActive();  
  4.   
  5.         if (channel.config().isAutoRead()) {  
  6.             channel.read();  
  7.         }  
  8.   
  9.         return this;  
  10.     }  

         到此,worker线程对应的selector就开始监听该socketChannel上的read事件了。 

 

接下来继续分析boss线程的执行:

将本次readBuf中缓存的所有NioSocketChannel注册后,就将他们从readBuf中移除。然后触发ChannelReadComplete事件,

Java代码  收藏代码
  1. // DefaultChannelPipeline  
  2.  public ChannelPipeline fireChannelReadComplete() {  
  3.         head.fireChannelReadComplete();  
  4.         if (channel.config().isAutoRead()) {  
  5.             read();  
  6.         }  
  7.         return this;  
  8.     }  

 head.fireChannelReadComplete()触发的是一个inbound事件,没有做任何事情。接着分析后续触发的read事件,这是一个outbound事件,也没有做任何事情(将ops重新设置为OP_ACCEPT,其实本来就是OP_ACCEPT)。

到此,一次accept的流程就执行完了。

 

总结:

一次accept的流程发生了以下事情:

  1. 为接受的客户端连接建立一个已连接套接字,设置为非阻塞。基于已连接套接字实例化一个NioSocketChannel,设置readInterestOp为SelectionKey.OP_READ,为其创建管道,并实例化内部的NioByteUnsafe。
  2. 在触发ServerSocketChannel的管道的channelRead方法之前,一个ServerSocketChannel一次可以最多缓存maxMessagesPerRead(默认为16)个NioSocketChannel。
  3. channelRead是一个Inbound事件,做了以下几件事:调用ServerBootstrapAcceptor处理器的channelRead方法为NioSocketChannel的管道加入ChannelInitializer处理器(该处理器的initChannel方法会在channalRegistered事件被触发时调用,将EchoServerHandler加入到管道中);设置NioSocketChannel的属性;从worker线程池中启动一个worker线程,执行register0任务。
  4. register0任务做的事情是:将socketChannal注册到selector中,触发channelRegistered事件,调用ChannelInitializer的initChannel方法将main函数中设置的处理器(譬如:EchoServerHandler)加入到管道中,然后触发channelActive事件,最后里面触发read事件,将ops设置为read。到此,worker线程所属的NioEventLoop持有的selector就开始监听socketChannel的read事件了。
  5. 最后触发ChannelReadComplete(inbound)事件,里面又会触发read(outbound)事件,这两个事件均没有做任何实事。

    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多