本文为原创,转载请注明出处
netty4源码分析-accept
本文分析服务端如何accept客户端的connect请求,首先看下selector的I/O多路复用的分发逻辑:
- //NioEventLoop
- private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
- final NioUnsafe unsafe = ch.unsafe();
- if (!k.isValid()) {
- // close the channel if the key is not valid anymore
- unsafe.close(unsafe.voidPromise());
- return;
- }
- try {
- int readyOps = k.readyOps();
- if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
- unsafe.read();
- if (!ch.isOpen()) {
- // Connection already closed - no need to handle write.
- return;
- }
- }
- if ((readyOps & SelectionKey.OP_WRITE) != 0) {
- // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
- ch.unsafe().forceFlush();
- }
- if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
- // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
- // See https://github.com/netty/netty/issues/924
- int ops = k.interestOps();
- ops &= ~SelectionKey.OP_CONNECT;
- k.interestOps(ops);
-
- unsafe.finishConnect();
- }
- } catch (CancelledKeyException e) {
- unsafe.close(unsafe.voidPromise());
- }
- }
当有OP_ACCEPT事件到达时,分发给NioMessageUnsafe的read方法进行处理。
- //NioMessageUnsafe
- public void read() {
- assert eventLoop().inEventLoop();
- final SelectionKey key = selectionKey();
- if (!config().isAutoRead()) {
- int interestOps = key.interestOps();
- if ((interestOps & readInterestOp) != 0) {
- // only remove readInterestOp if needed
- key.interestOps(interestOps & ~readInterestOp);
- }
- }
- final ChannelConfig config = config();
- final int maxMessagesPerRead = config.getMaxMessagesPerRead();
- final boolean autoRead = config.isAutoRead();
- final ChannelPipeline pipeline = pipeline();
- boolean closed = false;
- Throwable exception = null;
- try {
- for (;;) {
- int localRead = doReadMessages(readBuf);
- if (localRead == 0) {
- break;
- }
- if (localRead < 0) {
- closed = true;
- break;
- }
- if (readBuf.size() >= maxMessagesPerRead | !autoRead) {
- break;
- }
- }
- } catch (Throwable t) {
- exception = t;
- }
- int size = readBuf.size();
- for (int i = 0; i < size; i ++) {
- pipeline.fireChannelRead(readBuf.get(i));
- }
- readBuf.clear();
- pipeline.fireChannelReadComplete();
- if (exception != null) {
- if (exception instanceof IOException) {
- // ServerChannel should not be closed even on IOException because it can often continue
- // accepting incoming connections. (e.g. too many open files)
- closed = !(AbstractNioMessageChannel.this instanceof ServerChannel);
- }
-
- pipeline.fireExceptionCaught(exception);
- }
-
- if (closed) {
- if (isOpen()) {
- close(voidPromise());
- }
- }
- }
- }
其中doReadMessages方法由NioServerSocketChannel实现:
- // NioServerSocketChannel
- protected int doReadMessages(List<Object> buf) throws Exception {
- SocketChannel ch = javaChannel().accept();
-
- try {
- if (ch != null) {
- buf.add(new NioSocketChannel(this, ch));
- return 1;
- }
- } catch (Throwable t) {
- logger.warn("Failed to create a new channel from an accepted socket.", t);
-
- try {
- ch.close();
- } catch (Throwable t2) {
- logger.warn("Failed to close a socket.", t2);
- }
- }
-
- return 0;
- }
SocketChannel ch = javaChannel().accept()就为接受的客户端连接建立了一个已连接套接字socketChannel.
buf.add(new NioSocketChannel(this, ch))会构造一个NioSocketChannel,并将其缓存到buf中(buf是一个List<Object>)。该NioSocketChannel的模式为非阻塞,readInterestOp为SelectionKey.OP_READ,并创建对应的管道和NioByteUnsafe实例。
maxMessagesPerRead表示如果此时有多个connect,那么只有当SeverSocketChannel建立的已连接套接字个数超过maxMessagesPerRead后,才会对每个已连接套接字触发channelRead事件。maxMessagesPerRead的默认值是16.
接下来分析channelRead事件做了什么事情:
channelRead是Inbound事件,会调用ServerBootstrapAcceptor的channelRead方法:
- // ServerBootstrapAcceptor
- public void channelRead(ChannelHandlerContext ctx, Object msg) {
- Channel child = (Channel) msg;
-
- child.pipeline().addLast(childHandler);
-
- for (Entry<ChannelOption<?>, Object> e: childOptions) {
- try {
- if (!child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {
- logger.warn("Unknown channel option: " + e);
- }
- } catch (Throwable t) {
- logger.warn("Failed to set a channel option: " + child, t);
- }
- }
-
- for (Entry<AttributeKey<?>, Object> e: childAttrs) {
- child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
- }
-
- try {
- childGroup.register(child);
- } catch (Throwable t) {
- child.unsafe().closeForcibly();
- logger.warn("Failed to register an accepted channel: " + child, t);
- }
- }
首先child.pipeline().addLast(childHandler)将服务端main函数中实例化的ChannelInitializer加入到管道中,该处理器的initChannel方法会在channelRegistered事件触发时被调用
- childHandler(new ChannelInitializer<SocketChannel>() {
- @Override
- public void initChannel(SocketChannel ch) throws Exception {
- ch.pipeline().addLast(
- //new LoggingHandler(LogLevel.INFO),
- new EchoServerHandler());
- }
- });
然后设置NioSocketchannel的一些属性,最后进行注册:childGroup.register(child)。
这里采用的是childGroup,即worker线程池所在的Group,从Group中选择一个NioEventLoop,并启动其持有的worker线程,执行register0任务。
- // AbstractUnsafe
- public final void register(EventLoop eventLoop, final ChannelPromise promise) {
- if (eventLoop == null) {
- throw new NullPointerException("eventLoop");
- }
- if (isRegistered()) {
- promise.setFailure(new IllegalStateException("registered to an event loop already"));
- return;
- }
- if (!isCompatible(eventLoop)) {
- promise.setFailure(
- new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
- return;
- }
-
- AbstractChannel.this.eventLoop = eventLoop;
-
- if (eventLoop.inEventLoop()) {
- register0(promise);
- } else {
- try {
- eventLoop.execute(new Runnable() {
- @Override
- public void run() {
- register0(promise);
- }
- });
- } catch (Throwable t) {
- logger.warn(
- "Force-closing a channel whose registration task was not accepted by an event loop: {}",
- AbstractChannel.this, t);
- closeForcibly();
- closeFuture.setClosed();
- promise.setFailure(t);
- }
- }
- }
-
- private void register0(ChannelPromise promise) {
- try {
- // check if the channel is still open as it could be closed in the mean time when the register
- // call was outside of the eventLoop
- if (!ensureOpen(promise)) {
- return;
- }
- doRegister();
- registered = true;
- promise.setSuccess();
- pipeline.fireChannelRegistered();
- if (isActive()) {
- pipeline.fireChannelActive();
- }
- } catch (Throwable t) {
- // Close the channel directly to avoid FD leak.
- closeForcibly();
- closeFuture.setClosed();
- if (!promise.tryFailure(t)) {
- logger.warn(
- "Tried to fail the registration promise, but it is complete already. " +
- "Swallowing the cause of the registration failure:", t);
- }
- }
- }
此时worker线程就启动了。Register0任务在connect文章中已经描述,其主要功能就是将socketchannel注册到selector中;然后触发channelRegistered事件,调用ChannelInitializer的initChannel方法将服务端main函数中设置的处理器(本例为EchoServerHandler)加入到管道中,并将自己ChannelInitializer从管道中移除;最后触发channelActive事件,将ops设置为read。
- // DefaultChannelPipeline
- public ChannelPipeline fireChannelActive() {
- head.fireChannelActive();
-
- if (channel.config().isAutoRead()) {
- channel.read();
- }
-
- return this;
- }
到此,worker线程对应的selector就开始监听该socketChannel上的read事件了。
接下来继续分析boss线程的执行:
将本次readBuf中缓存的所有NioSocketChannel注册后,就将他们从readBuf中移除。然后触发ChannelReadComplete事件,
- // DefaultChannelPipeline
- public ChannelPipeline fireChannelReadComplete() {
- head.fireChannelReadComplete();
- if (channel.config().isAutoRead()) {
- read();
- }
- return this;
- }
head.fireChannelReadComplete()触发的是一个inbound事件,没有做任何事情。接着分析后续触发的read事件,这是一个outbound事件,也没有做任何事情(将ops重新设置为OP_ACCEPT,其实本来就是OP_ACCEPT)。
到此,一次accept的流程就执行完了。
总结:
一次accept的流程发生了以下事情:
- 为接受的客户端连接建立一个已连接套接字,设置为非阻塞。基于已连接套接字实例化一个NioSocketChannel,设置readInterestOp为SelectionKey.OP_READ,为其创建管道,并实例化内部的NioByteUnsafe。
- 在触发ServerSocketChannel的管道的channelRead方法之前,一个ServerSocketChannel一次可以最多缓存maxMessagesPerRead(默认为16)个NioSocketChannel。
- channelRead是一个Inbound事件,做了以下几件事:调用ServerBootstrapAcceptor处理器的channelRead方法为NioSocketChannel的管道加入ChannelInitializer处理器(该处理器的initChannel方法会在channalRegistered事件被触发时调用,将EchoServerHandler加入到管道中);设置NioSocketChannel的属性;从worker线程池中启动一个worker线程,执行register0任务。
- register0任务做的事情是:将socketChannal注册到selector中,触发channelRegistered事件,调用ChannelInitializer的initChannel方法将main函数中设置的处理器(譬如:EchoServerHandler)加入到管道中,然后触发channelActive事件,最后里面触发read事件,将ops设置为read。到此,worker线程所属的NioEventLoop持有的selector就开始监听socketChannel的read事件了。
- 最后触发ChannelReadComplete(inbound)事件,里面又会触发read(outbound)事件,这两个事件均没有做任何实事。
|