配色: 字号:
记一次有趣的 Netty 源码问题
2017-01-19 | 阅:  转:  |  分享 
  
记一次有趣的Netty源码问题



背景



起因是一个朋友问我的一个关于ServerBootstrap启动的问题.

相关issue



他的问题我复述一下:

ServerBootstrap的绑定流程如下:



ServerBootstrap.bind->

AbstractBootstrap.bind->

AbstractBootstrap.doBind->

AbstractBootstrap.initAndRegister->

AbstractChannel#AbstractUnsafe.register->

eventLoop.execute(()->AbstractUnsafe.register0)

doBind0()->

channel.eventLoop().execute(()->channel.bind)->

AbstractUnsafe.bind

在AbstractUnsafe.register0中可能会调用pipeline.fireChannelActive(),即:



privatevoidregister0(ChannelPromisepromise){

try{

...

booleanfirstRegistration=neverRegistered;

doRegister();

...

if(firstRegistration&&isActive()){

pipeline.fireChannelActive();

}

}catch(Throwablet){

...

}

}

并且在AbstractUnsafe.bind中也会有pipeline.fireChannelActive()的调用,即:



publicfinalvoidbind(finalSocketAddresslocalAddress,finalChannelPromisepromise){

...

booleanwasActive=isActive();

try{

doBind(localAddress);

}catch(Throwablet){

...

}



if(!wasActive&&isActive()){

invokeLater(newOneTimeTask(){

@Override

publicvoidrun(){

pipeline.fireChannelActive();

}

});

}

...

}

那么有没有可能造成了两次的pipeline.fireChannelActive()调用?



我的回答是不会.为什么呢?对于直接想知道答案的朋友可以直接阅读到最后面的回答与总结两节..



下面我们就来根据代码详细分析一下.



分析



首先,根据我们上面所列出的调用流程,会有AbstractBootstrap.doBind的调用,它的代码如下:



privateChannelFuturedoBind(finalSocketAddresslocalAddress){

//步骤1

finalChannelFutureregFuture=initAndRegister();

...

//步骤2

if(regFuture.isDone()){

...

doBind0(regFuture,channel,localAddress,promise);

...

}else{

regFuture.addListener(newChannelFutureListener(){

@Override

publicvoidoperationComplete(ChannelFuturefuture)throwsException{

...

doBind0(regFuture,channel,localAddress,promise);

}

});

}

}

首先在doBind中,执行步骤1,即调用initAndRegister方法,这个方法会最终调用到AbstractChannel#AbstractUnsafe.register.而在AbstractChannel#AbstractUnsafe.register中,会通过eventLoop.execute的形式将AbstractUnsafe.register0的调用提交到任务队列中(即提交到eventLoop线程中,而当前代码所在的线程是main线程),即:



Override

publicfinalvoidregister(EventLoopeventLoop,finalChannelPromisepromise){

//当前线程是主线程,因此这个判断是false

if(eventLoop.inEventLoop()){

register0(promise);

}else{

try{

eventLoop.execute(newOneTimeTask(){

@Override

publicvoidrun(){

//register0在eventLoop线程中执行.

register0(promise);

}

});

}catch(Throwablet){

...

}

}

}

接着AbstractBootstrap.initAndRegister返回,回到AbstractBootstrap.doBind中,于是执行到步骤2.注意,因为AbstractUnsafe.register0是在eventLoop中执行的,因此有可能主线程执行到步骤2时,AbstractUnsafe.register0已经执行完毕了,此时必然有regFuture.isDone()==true;但也有可能AbstractUnsafe.register0没有来得及执行,因此此时regFuture.isDone()==false.所以上面的步骤2考虑到了这两种情况,因此分别针对这两种情况做了区分,即:



//步骤2

if(regFuture.isDone()){

...

doBind0(regFuture,channel,localAddress,promise);

...

}else{

regFuture.addListener(newChannelFutureListener(){

@Override

publicvoidoperationComplete(ChannelFuturefuture)throwsException{

...

doBind0(regFuture,channel,localAddress,promise);

}

});

}

一般情况下,regFuture.isDone()为false,因为绑定操作是比较费时的,因此很大几率会执行到else分支,并且if分支和else分支从结果上说没有不同,而且if分支逻辑还更简单一些,因此我们以else分支来分析吧.在else分支中,会为regFuture设置一个回调监听器.regFuture是一个ChannelFuture,而ChannelFuture代表了一个Channel的异步IO的操作结果,因此这里regFuture代表了Channel注册(register)的这个异步IO的操作结果.

Netty这里之所以要为regFuture设置一个回调监听器,是为了保证register和bind的时序上的正确性:Channel的注册必须要发生在Channel的绑定之前.

(关于时序的正确性的问题,我们在后面有证明)



接下来我们来看一下AbstractUnsafe.register0方法:



privatevoidregister0(ChannelPromisepromise){

try{

....

//neverRegistered一开始是true,因此firstRegistration==true

booleanfirstRegistration=neverRegistered;

doRegister();

neverRegistered=false;

registered=true;

safeSetSuccess(promise);

pipeline.fireChannelRegistered();

//OnlyfireachannelActiveifthechannelhasneverbeenregistered.Thispreventsfiring

//multiplechannelactivesifthechannelisderegisteredandre-registered.

//firstRegistration==true,而isActive()==false,

//因此不会执行到pipeline.fireChannelActive()

if(firstRegistration&&isActive()){

pipeline.fireChannelActive();

}

}catch(Throwablet){

//ClosethechanneldirectlytoavoidFDleak.

closeForcibly();

closeFuture.setClosed();

safeSetFailure(promise,t);

}

}

注意,我需要再强调一下,这里AbstractUnsafe.register0是在eventLoop中执行的.

AbstractUnsafe.register0中会调用doRegister()注册NioServerSocketChannel,然后调用safeSetSuccess()设置promise的状态为成功.而这个promise变量是什么呢?我将AbstractBootstrap.doBind的调用链写详细一些:



AbstractBootstrap.doBind->

AbstractBootstrap.initAndRegister->

MultithreadEventLoopGroup.register->

SingleThreadEventLoop.register->

AbstractChannel#AbstractUnsafe.register->

eventLoop.execute(()->AbstractUnsafe.register0)

在SingleThreadEventLoop.register中会实例化一个DefaultChannelPromise,即:



@Override

publicChannelFutureregister(Channelchannel){

returnregister(channel,newDefaultChannelPromise(channel,this));

}

接着调用重载的SingleThreadEventLoop.register方法:



@Override

publicChannelFutureregister(finalChannelchannel,finalChannelPromisepromise){

if(channel==null){

thrownewNullPointerException("channel");

}

if(promise==null){

thrownewNullPointerException("promise");

}



channel.unsafe().register(this,promise);

returnpromise;

}

我们看到,实例化的DefaultChannelPromise最终会以方法返回值的方式返回到调用方,即返回到AbstractBootstrap.doBind中:



finalChannelFutureregFuture=initAndRegister();

因此我们这里有一个共识:regFuture是一个在SingleThreadEventLoop.register中实例化的DefaultChannelPromise对象.



再回到SingleThreadEventLoop.register中,在这里会调用channel.unsafe().register(this,promise),将promise对象传递到AbstractChannel#AbstractUnsafe.register中,因此在AbstractUnsafe.register0中的promise就是AbstractBootstrap.doBind中的regFuture.

promise==regFuture很关键.



既然我们已经确定了promise的身份,那么调用的safeSetSuccess(promise);我们也知道是干嘛的了.safeSetSuccess方法设置一个Promise的状态为成功态,而Promise的成功态是最终状态,即此时promise.isDone()==true.那么设置promise为成功态后,会发生什么呢?

还记得不promise==regFuture,而我们在AbstractBootstrap.doBind的else分支中设置了一个回调监听器:



finalPendingRegistrationPromisepromise=newPendingRegistrationPromise(channel);

regFuture.addListener(newChannelFutureListener(){

@Override

publicvoidoperationComplete(ChannelFuturefuture)throwsException{

Throwablecause=future.cause();

if(cause!=null){

//RegistrationontheEventLoopfailedsofailtheChannelPromisedirectlytonotcausean

//IllegalStateExceptiononcewetrytoaccesstheEventLoopoftheChannel.

promise.setFailure(cause);

}else{

//Registrationwassuccessful,sosetthecorrectexecutortouse.

//Seehttps://github.com/netty/netty/issues/2586

promise.executor=channel.eventLoop();

}

doBind0(regFuture,channel,localAddress,promise);

}

});

因此当safeSetSuccess(promise);调用时,根据Netty的Promise/Future机制,会触发上面的operationComplete回调,在回调中调用doBind0方法:



privatestaticvoiddoBind0(

finalChannelFutureregFuture,finalChannelchannel,

finalSocketAddresslocalAddress,finalChannelPromisepromise){

//ThismethodisinvokedbeforechannelRegistered()istriggered.Giveuserhandlersachancetosetup

//thepipelineinitschannelRewww.tt951.comgistered()implementation.

channel.eventLoop().execute(newRunnable(){

@Override

publicvoidrun(){

if(regFuture.isSuccess()){

channel.bind(localAddress,promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);

}else{

promise.setFailure(regFuture.cause());

}

}

});

}

注意到,有一个关键的地方,代码中将channel.bind的调用放到了eventLoop中执行.doBind0返回后,代码继续执行AbstractUnsafe.register0方法的剩余部分代码,即:



privatevoidregister0(ChannelPromisepromise){

try{

....

safeSetSuccess(promise);

//safeSetSuccess返回后,继续执行如下代码

pipeline.fireChannelRegistered();

//OnlyfireachannelActiveifthechannelhasneverbeenregistered.Thispreventsfiring

//multiplechannelactivesifthechannelisderegisteredandre-registered.

//firstRegistration==true,而isActive()==false,

//因此不会执行到pipeline.fireChannelActive()

if(firstRegistration&&isActive()){

pipeline.fireChannelActive();

}

}catch(Throwablet){

//ClosethechanneldirectlytoavoidFDleak.

closeForcibly();

closeFuture.setClosed();

safeSetFailure(promise,t);

}

}

当AbstractUnsafe.register0方法执行完毕后,才执行到channel.bind方法.



而channel.bind方法最终会调用到AbstractChannel#AbstractUnsafe.bind方法,源码如下:



@Override

publicfinalvoidbind(finalSocketAddresslocalAddress,finalChannelPromisepromise){

booleanwasActive=isActive();

logger.info("---wasActive:{}---",wasActive);



try{

//调用NioServerSocketChannel.bind方法,

//将底层的JavaNIOSocketChannel绑定到指定的端口.

//当SocketChannel绑定到端口后,isActive()才为真.

doBind(localAddress);

}catch(Throwablet){

...

}



booleanactiveNow=isActive();

logger.info("---activeNow:{}---",activeNow);



//这里wasActive==false

//isActive()==true

if(!wasActive&&isActive()){

invokeLater(newOneTimeTask(){

@Override

publicvoidrun(){

pipeline.fireChannelActive();

}

});

}



safeSetSuccess(promise);

}

上面的代码中,调用了doBind(localAddress)将底层的JavaNIOSocketChannel绑定到指定的端口.并且当SocketChannel绑定到端口后,isActive()才为真.

因此我们知道,如果SocketChannel第一次绑定时,在调用doBind前,wasActive==false==isActive(),而当调用了doBind后,isActive()==true,因此第一次绑定端口时,if判断成立,会调用pipeline.fireChannelActive().



关于Channel注册与绑定的时序问题



我们在前的分析中,直接认定了Channel注册在Channel的绑定之前完成,那么依据是什么呢?

其实所有的关键在于EventLoop的任务队列机制.

不要闲我啰嗦哦.我们需要继续回到AbstractUnsafe.register0的调用中(再次强调一下,在eventLoop线程中执行AbstractUnsafe.register0),这个方法我们已经分析了,它会调用safeSetSuccess(promise),并由Netty的Promise/Future机制,导致了AbstractBootstrap.doBind中的regFuture所设置的回调监听器的operatiwww.baiyuewang.netonComplete方法调用,而operationComplete中调用了AbstractBootstrap.doBind0:



privatestaticvoiddoBind0(

finalChannelFutureregFuture,finalChannelchannel,

finalSocketAddresslocalAddress,finalChannelPromisepromise){

channel.eventLoop().execute(newRunnable(){

@Override

publicvoidrun(){

if(regFuture.isSuccess()){

channel.bind(localAddress,promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);

}else{

promise.setFailure(regFuture.cause());

}

}

});

}

在doBind0中,根据EventLoop的任务队列机制,会使用eventLoop().execute将channel.bind封装为一个Task,放到eventLoop的taskQueue中.

如下用一幅图表示上面的过程:







点此下载原图

而当channel.bind被调度时,AbstractUnsafe.register0早就已经调用结束了.



因此由于EventLoop的任务队列机制,我们知道,在执行AbstractUnsafe.register0时,是在EventLoop线程中的,而channel.bind的调用是以task的形式添加到taskQueue队列的末尾,因此必然是有EventLoop线程先执行完AbstractUnsafe.register0方法后,才有机会从taskQueue中取出一个task来执行,因此这个机制从根本上保证了Channel注册发生在绑定之前.



回答



你的疑惑是,AbstractChannel#AbstractUnsafe.register0中,可能会调用pipeline.fireChannelActive(),即:



privatevoidregister0(ChannelPromisepromise){

try{

...

booleanfirstRegistration=neverRegistered;

doRegister();

...

if(firstRegistration&&isActive()){

pipeline.fireChannelActive();

}

}catch(Throwablet){

...

}

}

并且在AbstractChannel#AbstractUnsafe.bind中也可能会调用到pipeline.fireChannelActive(),即:



publicfinalvoidbind(finalSocketAddresslocalAddress,finalChannelPromisepromise){

...

booleanwasActive=isActive();

try{

doBind(localAddress);

}catch(Throwablet){

...

}



if(!wasActive&&isActive()){

invokeLater(newOneTimeTask(){

@Override

publicvoidrun(){

pipeline.fireChannelActive();

}

});

}

...

}

我觉得是不会.因为根据上面我们分析的结果可知,Netty的Promise/Future与EventLoop的任务队列机制保证了NioServerSocketChannel的注册和Channel的绑定的时序:Channel的注册必须要发生在Channel的绑定之前,而当一个NioServerSocketChannel没有绑定到具体的端口前,它是不活跃的(Inactive),进而在register0中,if(firstRegistration&&isActive())就不成立,因此就不会执行到pipeline.fireChannelActive()了.

而执行完注册操作后,在AbstractChannel#AbstractUnsafe.bind才会调用pipeline.fireChannelActive(),因此最终只有一次fireChannelActive调用.



总结



有两点需要注意的:



isActive()==true成立的关键是此NioServerSocketChannel已经绑定到端口上了.

由Promise/Future与EventLoop机制,导致了Channel的注册发生在Channel的绑定之前,因此在AbstractChannel#AbstractUnsafe.register0中的isActive()==false,if判断不成立,最终就是register0中的pipeline.fireChannelActive()不会被调用.

献花(0)
+1
(本文系thedust79首藏)