配色: 字号:
Netty实现高性能RPC服务器优化篇之消息序列化
2016-11-09 | 阅:  转:  |  分享 
  
Netty实现高性能RPC服务器优化篇之消息序列化



在本人写的前一篇文章中,谈及有关如何利用Netty开发实现,高性能RPC服务器的一些设计思路、设计原理,以及具体的实现方案(具体参见:谈谈如何使用Netty开发实现高性能的RPC服务器)。在文章的最后提及到,其实基于该方案设计的RPC服务器的处理性能,还有优化的余地。于是利用周末的时间,在原来NettyRPC框架的基础上,加以优化重构,本次主要优化改造点如下:



1、NettyRPC中对RPC消息进行编码、解码采用的是Netty自带的ObjectEncoder、ObjectDecoder(对象编码、解码器),该编码、解码器基于的是Java的原生序列化机制,从已有的文章以及测试数据来看,Java的原生序列化性能效率不高,而且产生的序列化二进制码流太大,故本次在优化中,引入RPC消息序列化协议的概念。所谓消息序列化协议,就是针对RPC消息的序列化、反序列化过程进行特殊的定制,引入第三方编解码框架。本次引入的第三方编解码框架有Kryo、Hessian。这里,不得不再次提及一下,对象序列化、反序列化的概念,在RPC的远程服务调用过程中,需要把消息对象通过网络传输,这个就要用到序列化将对象转变成字节流,到达另外一端之后,再反序列化回来变成消息对象。



2、引入GoogleGuava并发编程框架对NettyRPC的NIO线程池、业务线程池进行重新梳理封装。



3、利用第三方编解码框架(Kryo、Hessian)的时候,考虑到高并发的场景下,频繁的创建、销毁序列化对象,会非常消耗JVM的内存资源,影响整个RPC服务器的处理性能,因此引入对象池化(ObjectPooling)技术。众所周知,创建新对象并初始化,可能会消耗很多的时间。当需要产生大量对象的时候,可能会对性能造成一定的影响。为了解决这个问题,除了提升硬件条件之外,对象池化技术就是这方面的银弹,而ApacheCommonsPool框架就是对象池化技术的一个很好的实现(开源项目路径:http://commons.apache.org/proper/commons-pool/download_pool.cgi)。本文中的Hessian池化工作,主要是基于ApacheCommonsPool框架,进行封装处理。



本文将着重,从上面的三个方面,对重构优化之后的NettyRPC服务器的实现思路、实现方式进行重点讲解。首先请大家简单看下,本次优化之后的NettyRPC服务器支持的序列化协议,如下图所示:







可以很清楚的看到,优化之后的NettyRPC可以支持Kryo、Hessian、Java本地序列化三种消息序列化方式。其中Java本地序列化方式,相信大家应该很熟悉了,再次不在重复讲述。现在我们重点讲述一下,另外两种序列化方式:



1、Kryo序列化。它是针对Java,而定制实现的高效对象序列化框架,相比Java本地原生序列化方式,Kryo在处理性能上、码流大小上等等方面有很大的优化改进。目前已知的很多著名开源项目,都引入采用了该序列化方式。比如alibaba开源的dubboRPC等等。本文中采用的Kryo的默认版本是基于:kryo-3.0.3。它的下载链接是:https://github.com/EsotericSoftware/kryo/releases/tag/kryo-parent-3.0.3。为什么采用这个版本?主要原因我上面也说明了,出于应对高并发场景下,频繁地创建、销毁序列化对象,会非常消耗JVM的内存资源、以及时间。Kryo的这个发行版本中,集成引入了序列化对象池功能模块(KryoFactory、KryoPool),这样我们就不必再利用ApacheCommonsPool对其进行二次封装。



2、Hessian序列化。Hessian本身是一种序列化协议,它比Java原生的序列化、反序列化速度更快、序列化出来的数据也更小。它是采用二进制格式进行数据传输,而且,目前支持多种语言格式。本文中采用的是:hessian-4.0.37版本,它的下载链接是:http://hessian.caucho.com/#Java。



接下来,先来看下优化之后的NettyRPC的消息协议编解码包(newlandframework.netty.rpc.serialize.support、newlandframework.netty.rpc.serialize.support.kryo、newlandframework.netty.rpc.serialize.support.hessian)的结构,如下图所示:







其中RPC请求消息结构代码如下:



复制代码

/

@filename:MessageRequest.java



NewlandCo.Ltd.Allrightsreserved.



@Description:rpc服务请求结构

@authortangjie

@version1.0



/

packagenewlandframework.netty.rpc.model;



importjava.io.Serializable;

importorg.apache.commons.lang.builder.ReflectionToStringBuilder;



publicclassMessageRequestimplementsSerializable{



privateStringmessageId;

privateStringclassName;

privateStringmethodName;

privateClass[]typeParameters;

privateObject[]parametersVal;



publicStringgetMessageId(){

returnmessageId;

}



publicvoidsetMessageId(StringmessageId){

this.messageId=messageId;

}



publicStringgetClassName(){

returnclassName;

}



publicvoidsetClassName(StringclassName){

this.className=className;

}



publicStringgetMethodName(){

returnmethodName;

}



publicvoidsetMethodName(StringmethodName){

this.methodName=methodName;

}



publicClass[]getTypeParameters(){

returntypeParameters;

}



publicvoidsetTypeParameters(Class[]typeParameters){

this.typeParameters=typeParameters;

}



publicObject[]getParameters(){

returnparametersVal;

}



publicvoidsetParameters(Object[]parametersVal){

this.parametersVal=parametersVal;

}



publicStringtoString(){

returnReflectionToStringBuilder.toStringExclude(this,newString[]{"typeParameters","parametersVal"});

}

}

复制代码

RPC应答消息结构,如下所示:



复制代码

/

@filename:MessageResponse.java



NewlandCo.Ltd.Allrightsreserved.



@Description:rpc服务应答结构

@authortangjie

@version1.0



/

packagenewlandframework.netty.rpc.model;



importjava.io.Serializable;

importorg.apache.commons.lang.builder.ReflectionToStringBuilder;



publicclassMessageResponseimplementsSerializable{



privateStringmessageId;

privateStringerror;

privateObjectresultDesc;



publicStringgetMessageId(){

returnmessageId;

}



publicvoidsetMessageId(StringmessageId){

this.messageId=messageId;

}



publicStringgetError(){

returnerror;

}



publicvoidsetError(Stringerror){

this.error=error;

}



publicObjectgetResult(){

returnresultDesc;

}



publicvoidsetResult(ObjectresultDesc){

this.resultDesc=resultDesc;

}



publicStringtoString(){

returnReflectionToStringBuilder.toString(this);

}

}

复制代码

现在,我们就来对上述的RPC请求消息、应答消息进行编解码框架的设计。由于NettyRPC中的协议类型,目前已经支持Kryo序列化、Hessian序列化、Java原生本地序列化方式。考虑到可扩展性,故要抽象出RPC消息序列化,协议类型对象(RpcSerializeProtocol),它的代码实现如下所示:



复制代码

/

@filename:RpcSerializeProtocol.java



NewlandCo.Ltd.Allrightsreserved.



@Description:RPC消息序序列化协议类型

@authortangjie

@version1.0



/

packagenewlandframework.netty.rpc.serialize.support;



importorg.apache.commons.lang.builder.ReflectionToStringBuilder;

importorg.apache.commons.lang.builder.ToStringStyle;



publicenumRpcSerializeProtocol{



//目前由于没有引入跨语言RPC通信机制,暂时采用支持同构语言Java序列化/反序列化机制的第三方插件

//NettyRPC目前已知的序列化插件有:Java原生序列化、Kryo、Hessian

JDKSERIALIZE("jdknative"),KRYOSERIALIZE("kryo"),HESSIANSERIALIZE("hessian");



privateStringserializeProtocol;



privateRpcSerializeProtocol(StringserializeProtocol){

this.serializeProtocol=serializeProtocol;

}



publicStringtoString(){

ReflectionToStringBuilder.setDefaultStyle(ToStringStyle.SHORT_PREFIX_STYLE);

returnReflectionToStringBuilder.toString(this);

}



publicStringgetProtocol(){

returnserializeProtocol;

}

}

复制代码

针对不同编解码序列化的框架(这里主要是指Kryo、Hessian),再抽象、萃取出一个RPC消息序列化/反序列化接口(RpcSerialize)、RPC消息编解码接口(MessageCodecUtil)。



复制代码

/

@filename:RpcSerialize.java



NewlandCo.Ltd.Allrightsreserved.



@Description:RPC消息序列化/反序列化接口定义

@authortangjie

@version1.0



/

packagenewlandframework.netty.rpc.serialize.support;



importjava.io.IOException;

importjava.io.InputStream;

importjava.io.OutputStream;



publicinterfaceRpcSerialize{



voidserialize(OutputStreamoutput,Objectobject)throwsIOException;



Objectdeserialize(InputStreaminput)throwsIOException;

}

复制代码

复制代码

/

@filename:MessageCodecUtil.java



NewlandCo.Ltd.Allrightsreserved.



@Description:RPC消息编解码接口

@authortangjie

@version1.0



/

packagenewlandframework.netty.rpc.serialize.support;



importio.netty.buffer.ByteBuf;

importjava.io.IOException;



publicinterfaceMessageCodecUtil{



//RPC消息报文头长度4个字节

finalpublicstaticintMESSAGE_LENGTH=4;



publicvoidencode(finalByteBufout,finalObjectmessage)throwsIOException;



publicObjectdecode(byte[]body)throwsIOException;

}

复制代码

最后我们的NettyRPC框架要能自由地支配、定制Netty的RPC服务端、客户端,采用何种序列化来进行RPC消息对象的网络传输。因此,要再抽象一个RPC消息序列化协议选择器接口(RpcSerializeFrame),对应的实现如下:



复制代码

/

@filename:RpcSerializeFrame.java



NewlandCo.Ltd.Allrightsreserved.



@Description:RPC消息序序列化协议选择器接口

@authortangjie

@version1.0



/

packagenewlandframework.netty.rpc.serialize.support;



importio.netty.channel.ChannelPipeline;



publicinterfaceRpcSerializeFrame{



publicvoidselect(RpcSerializeProtocolprotocol,ChannelPipelinepipeline);

}

复制代码

现在有了上面定义的一系列的接口,现在就可以定制实现,基于Kryo、Hessian方式的RPC消息序列化、反序列化模块了。先来看下整体的类图结构:







首先是RPC消息的编码器MessageEncoder,它继承自Netty的MessageToByteEncoder编码器。主要是把RPC消息对象编码成二进制流的格式,对应实现如下:



复制代码

/

@filename:MessageEncoder.java



NewlandCo.Ltd.Allrightsreserved.



@Description:RPC消息编码接口

@authortangjie

@version1.0



/

packagenewlandframework.netty.rpc.serialize.support;



importio.netty.buffer.ByteBuf;

importio.netty.channel.ChannelHandlerContext;

importio.netty.handler.codec.MessageToByteEncoder;



publicclassMessageEncoderextendsMessageToByteEncoder{



privateMessageCodecUtilutil=null;



publicMessageEncoder(finalMessageCodecUtilutil){

this.util=util;

}



protectedvoidencode(finalChannelHandlerContextctx,finalObjectmsg,finalByteBufout)throwsException{

util.encode(out,msg);

}

}

复制代码

接下来是RPC消息的解码器MessageDecoder,它继承自Netty的ByteToMessageDecoder。主要针对二进制流反序列化成消息对象。当然了,在之前的一篇文章中我曾经提到,NettyRPC是基于TCP协议的,TCP在传输数据的过程中会出现所谓的“粘包”现象,所以我们的MessageDecoder要对RPC消息体的长度进行校验,如果不满足RPC消息报文头中指定的消息体长度,我们直接重置一下ByteBuf读索引的位置,具体可以参考如下的代码方式,进行RPC消息协议的解析:



复制代码

/

@filename:MessageDecoder.java



NewlandCo.Ltd.Allrightsreserved.



@Description:RPC消息解码接口

@authortangjie

@version1.0



/

packagenewlandframework.netty.rpc.serialize.support;



importio.netty.buffer.ByteBuf;

importio.netty.channel.ChannelHandlerContext;

importio.netty.handler.codec.ByteToMessageDecoder;

importjava.io.IOException;

importjava.util.List;

importjava.util.logging.Level;

importjava.util.logging.Logger;



publicclassMessageDecoderextendsByteToMessageDecoder{



finalpublicstaticintMESSAGE_LENGTH=MessageCodecUtil.MESSAGE_LENGTH;

privateMessageCodecUtilutil=null;



publicMessageDecoder(finalMessageCodecUtilutil){

this.util=util;

}



protectedvoiddecode(ChannelHandlerContextctx,ByteBufin,Listout){

//出现粘包导致消息头长度不对,直接返回

if(in.readableBytes()
return;

}



in.markReaderIndex();

//读取消息的内容长度

intmessageLength=in.readInt();



if(messageLength<0){

ctx.close();

}



//读到的消息长度和报文头的已知长度不匹配。那就重置一下ByteBuf读索引的位置

if(in.readableBytes()
in.resetReaderIndex();

return;

}else{

byte[]messageBody=newbyte[messageLength];

in.readBytes(messageBody);



try{

Objectobj=util.decode(messageBody);

out.add(obj);

}catch(IOExceptionex){

Logger.getLogger(MessageDecoder.class.getName()).log(Level.SEVERE,null,ex);

}

}

}

}

复制代码

现在,我们进一步实现,利用Kryo序列化方式,对RPC消息进行编解码的模块。首先是要实现NettyRPC消息序列化接口(RpcSerialize)的方法。



复制代码

/

@filename:KryoSerialize.java



NewlandCo.Ltd.Allrightsreserved.



@Description:Kryo序列化/反序列化实现

@authortangjie

@version1.0



/

packagenewlandframework.netty.rpc.serialize.support.kryo;



importnewlandframework.netty.rpc.serialize.support.RpcSerialize;

importcom.esotericsoftware.kryo.Kryo;

importcom.esotericsoftware.kryo.io.Input;

importcom.esotericsoftware.kryo.io.Output;

importcom.esotericsoftware.kryo.pool.KryoPool;

importjava.io.IOException;

importjava.io.InputStream;

importjava.io.OutputStream;



publicclassKryoSerializeimplementsRpcSerialize{



privateKryoPoolpool=null;



publicKryoSerialize(finalKryoPoolpool){

this.pool=pool;

}



publicvoidserialize(OutputStreamoutput,Objectobject)throwsIOException{

Kryokryo=pool.borrow();

Outputout=newOutput(output);

kryo.writeClassAndObject(out,object);

out.close();

pool.release(kryo);

}



publicObjectdeserialize(InputStreaminput)throwsIOException{

Kryokryo=pool.borrow();

Inputin=newInput(input);

Objectresult=kryo.readClassAndObject(in);

in.close();

pool.release(kryo);

returnresult;

}

}

复制代码

接着利用Kryo库里面的对象池,对RPC消息对象进行编解码。首先是Kryo对象池工厂(KryoPoolFactory),这个也是我为什么选择kryo-3.0.3版本的原因了。代码如下:



复制代码

/

@filename:KryoPoolFactory.java



NewlandCo.Ltd.Allrightsreserved.



@Description:Kryo对象池工厂

@authortangjie

@version1.0



/

packagenewlandframework.netty.rpc.serialize.support.kryo;



importcom.esotericsoftware.kryo.Kryo;

importcom.esotericsoftware.kryo.pool.KryoFactory;

importcom.esotericsoftware.kryo.pool.KryoPool;

importnewlandframework.netty.rpc.model.MessageRequest;

importnewlandframework.netty.rpc.model.MessageResponse;

importorg.objenesis.strategy.StdInstantiatorStrategy;



publicclassKryoPoolFactory{



privatestaticKryoPoolFactorypoolFactory=null;



privateKryoFactoryfactory=newKryoFactory(){

publicKryocreate(){

Kryokryo=newKryo();

kryo.setReferences(false);

//把已知的结构注册到Kryo注册器里面,提高序列化/反序列化效率

kryo.register(MessageRequest.class);

kryo.register(MessageResponse.class);

kryo.setInstantiatorStrategy(newStdInstantiatorStrategy());

returnkryo;

}

};



privateKryoPoolpool=newKryoPool.Builder(factory).build();



privateKryoPoolFactory(){

}



publicstaticKryoPoolgetKryoPoolInstance(){

if(poolFactory==null){

synchronized(KryoPoolFactory.class){

if(poolFactory==null){

poolFactory=newKryoPoolFactory();

}

}

}

returnpoolFactory.getPool();

}



publicKryoPoolgetPool(){

returnpool;

}

}

复制代码

Kryo对RPC消息进行编码、解码的工具类KryoCodecUtil,实现了RPC消息编解码接口(MessageCodecUtil),具体实现方式如下:



复制代码

/

@filename:KryoCodecUtil.java



NewlandCo.Ltd.Allrightsreserved.



@Description:Kryo编解码工具类

@authortangjie

@version1.0



/

packagenewlandframework.netty.rpc.serialize.support.kryo;



importcom.esotericsoftware.kryo.pool.KryoPool;

importio.netty.buffer.ByteBuf;

importjava.io.ByteArrayInputStream;

importjava.io.ByteArrayOutputStream;

importjava.io.IOException;

importnewlandframework.netty.rpc.serialize.support.MessageCodecUtil;

importcom.google.common.io.Closer;



publicclassKryoCodecUtilimplementsMessageCodecUtil{



privateKryoPoolpool;

privatestaticClosercloser=Closer.create();



publicKryoCodecUtil(KryoPoolpool){

this.pool=pool;

}



publicvoidencode(finalByteBufout,finalObjectmessage)throwsIOException{

try{

ByteArrayOutputStreambyteArrayOutputStream=newByteArrayOutputStream();

closer.register(byteArrayOutputStream);

KryoSerializekryoSerialization=newKryoSerialize(pool);

kryoSerialization.serialize(byteArrayOutputStream,message);

byte[]body=byteArrayOutputStream.toByteArray();

intdataLength=body.length;

out.writeInt(dataLength);

out.writeBytes(body);

}finally{

closer.close();

}

}



publicObjectdecode(byte[]body)throwsIOException{

try{

ByteArrayInputStreambyteArrayInputStream=newByteArrayInputStream(body);

closer.register(byteArrayInputStream);

KryoSerializekryoSerialization=newKryoSerialize(pool);

Objectobj=kryoSerialization.deserialize(byteArrayInputStream);

returnobj;

}finally{

closer.close();

}

}

}

复制代码

最后是,Kryo自己的编码器、解码器,其实只要调用Kryo编解码工具类(KryoCodecUtil)里面的encode、decode方法就可以了。现在贴出具体的代码:



复制代码

/

@filename:KryoDecoder.java



NewlandCo.Ltd.Allrightsreserved.



@Description:Kryo解码器

@authortangjie

@version1.0



/

packagenewlandframework.netty.rpc.serialize.support.kryo;



importnewlandframework.netty.rpc.serialize.support.MessageCodecUtil;

importnewlandframework.netty.rpc.serialize.support.MessageDecoder;



publicclassKryoDecoderextendsMessageDecoder{



publicKryoDecoder(MessageCodecUtilutil){

super(util);

}

}

复制代码

复制代码

/

@filename:KryoEncoder.java



NewlandCo.Ltd.Allrightsreserved.



@Description:Kryo编码器

@authortangjie

@version1.0



/

packagenewlandframework.netty.rpc.serialize.support.kryo;



importnewlandframework.netty.rpc.serialize.support.MessageCodecUtil;

importnewlandframework.netty.rpc.serialize.support.MessageEncoder;



publicclassKryoEncoderextendsMessageEncoder{



publicKryoEncoder(MessageCodecUtilutil){

super(util);

}

}

复制代码

最后,我们再来实现一下,利用Hessian实现RPC消息的编码、解码器代码模块。首先还是Hessian序列化/反序列化实现(HessianSerialize),它同样实现了RPC消息序列化/反序列化接口(RpcSerialize),对应的代码如下:



复制代码

/

@filename:HessianSerialize.java



NewlandCo.Ltd.Allrightsreserved.



@Description:Hessian序列化/反序列化实现

@authortangjie

@version1.0



/

packagenewlandframework.netty.rpc.serialize.support.hessian;



importcom.caucho.hessian.io.Hessian2Input;

importcom.caucho.hessian.io.Hessian2Output;

importjava.io.IOException;

importjava.io.InputStream;

importjava.io.OutputStream;

importnewlandframework.netty.rpc.serialize.support.RpcSerialize;



publicclassHessianSerializeimplementsRpcSerialize{



publicvoidserialize(OutputStreamoutput,Objectobject){

Hessian2Outputho=newHessian2Output(output);

try{

ho.startMessage();

ho.writeObject(object);

ho.completeMessage();

ho.close();

output.close();

}catch(IOExceptione){

e.printStackTrace();

}

}



publicObjectdeserialize(InputStreaminput){

Objectresult=null;

try{

Hessian2Inputhi=newHessian2Input(input);

hi.startMessage();

result=hi.readObject();

hi.completeMessage();

hi.close();

}catch(IOExceptione){

e.printStackTrace();

}

returnresult;

}

}

复制代码

现在利用对象池(ObjectPooling)技术,对Hessian序列化/反序列化类(HessianSerialize)进行池化处理,对应的代码如下:



复制代码

/

@filename:HessianSerializeFactory.java



NewlandCo.Ltd.Allrightsreserved.



@Description:Hessian序列化/反序列化对象工厂池

@authortangjie

@version1.0



/

packagenewlandframework.netty.rpc.serialize.support.hessian;



importorg.apache.commons.pool2.BasePooledObjectFactory;

importorg.apache.commons.pool2.PooledObject;

importorg.apache.commons.pool2.impl.DefaultPooledObject;



publicclassHessianSerializeFactoryextendsBasePooledObjectFactory{



publicHessianSerializecreate()throwsException{

returncreateHessian();

}



publicPooledObjectwrap(HessianSerializehessian){

returnnewDefaultPooledObject(hessian);

}



privateHessianSerializecreateHessian(){

returnnewHessianSerialize();

}

}

复制代码

复制代码

/

@filename:HessianSerializePool.java



NewlandCo.Ltd.Allrightsreserved.



@Description:Hessian序列化/反序列化池

@authortangjie

@version1.0



/

packagenewlandframework.netty.rpc.serialize.support.hessian;



importorg.apache.commons.pool2.impl.GenericObjectPool;

importorg.apache.commons.pool2.impl.GenericObjectPoolConfig;



publicclassHessianSerializePool{



//Netty采用Hessian序列化/反序列化的时候,为了避免重复产生对象,提高JVM内存利用率,故引入对象池技术,经过测试

//遇到高并发序列化/反序列化的场景的时候,序列化效率明显提升不少。

privateGenericObjectPoolhessianPool;

privatestaticHessianSerializePoolpoolFactory=null;



privateHessianSerializePool(){

hessianPool=newGenericObjectPool(newHessianSerializeFactory());

}



publicstaticHessianSerializePoolgetHessianPoolInstance(){

if(poolFactory==null){

synchronized(HessianSerializePool.class){

if(poolFactory==null){

poolFactory=newHessianSerializePool();

}

}

}

returnpoolFactory;

}



//预留接口,后续可以通过SpringPropertyPlaceholder依赖注入

publicHessianSerializePool(finalintmaxTotal,finalintminIdle,finallongmaxWaitMillis,finallongminEvictableIdleTimeMillis){

hessianPool=newGenericObjectPool(newHessianSerializeFactory());

GenericObjectPoolConfigconfig=newGenericObjectPoolConfig();

//最大池对象总数

config.setMaxTotal(maxTotal);

//最小空闲数

config.setMinIdle(minIdle);

//最大等待时间,默认的值为-1,表示无限等待

config.setMaxWaitMillis(maxWaitMillis);

//退出连接的最小空闲时间默认1800000毫秒

config.setMinEvictableIdleTimeMillis(minEvictableIdleTimeMillis);

hessianPool.setConfig(config);

}



publicHessianSerializeborrow(){

try{

returngetHessianPool().borrowObject();

}catch(finalExceptionex){

ex.printStackTrace();

returnnull;

}

}



publicvoidrestore(finalHessianSerializeobject){

getHessianPool().returnObject(object);

}



publicGenericObjectPoolgetHessianPool(){

returnhessianPool;

}

}

复制代码

Hessian序列化对象经过池化处理之后,我们通过Hessian编解码工具类,来“借用”Hessian序列化对象(HessianSerialize),当然了,你借出来之后,一定要还回去嘛。Hessian编解码工具类的实现方式如下:



复制代码

/

@filename:HessianCodecUtil.java



NewlandCo.Ltd.Allrightsreserved.



@Description:Hessian编解码工具类

@authortangjie

@version1.0



/

packagenewlandframework.netty.rpc.serialize.support.hessian;



importcom.google.common.io.Closer;

importio.netty.buffer.ByteBuf;

importjava.io.ByteArrayInputStream;

importjava.io.ByteArrayOutputStream;

importjava.io.IOException;

importnewlandframework.netty.rpc.serialize.support.MessageCodecUtil;



publicclassHessianCodecUtilimplementsMessageCodecUtil{



HessianSerializePoolpool=HessianSerializePool.getHessianPoolInstance();

privatestaticClosercloser=Closer.create();



publicHessianCodecUtil(){



}



publicvoidencode(finalByteBufout,finalObjectmessage)throwsIOException{

try{

ByteArrayOutputStreambyteArrayOutputStream=newByteArrayOutputStream();

closer.register(byteArrayOutputStream);

HessianSerializehessianSerialization=pool.borrow();

hessianSerialization.serialize(byteArrayOutputStream,message);

byte[]body=byteArrayOutputStream.toByteArray();

intdataLength=body.length;

out.writeInt(dataLength);

out.writeBytes(body);

pool.restore(hessianSerialization);

}finally{

closer.close();

}

}



publicObjectdecode(byte[]body)throwsIOException{

try{

ByteArrayInputStreambyteArrayInputStream=newByteArrayInputStream(body);

closer.register(byteArrayInputStream);

HessianSerializehessianSerialization=pool.borrow();

Objectobject=hessianSerialization.deserialize(byteArrayInputStream);

pool.restore(hessianSerialization);

returnobject;

}finally{

closer.close();

}

}

}

复制代码

最后Hessian对RPC消息的编码器、解码器参考实现代码如下所示:



复制代码

/

@filename:HessianDecoder.java



NewlandCo.Ltd.Allrightsreserved.



@Description:Hessian解码器

@authortangjie

@version1.0



/

packagenewlandframework.netty.rpc.serialize.support.hessian;



importnewlandframework.netty.rpc.serialize.support.MessageCodecUtil;

importnewlandframework.netty.rpc.serialize.support.MessageDecoder;



publicclassHessianDecoderextendsMessageDecoder{



publicHessianDecoder(MessageCodecUtilutil){

super(util);

}

}

复制代码

复制代码

/

@filename:HessianEncoder.java



NewlandCo.Ltd.Allrightsreserved.



@Description:Hessian编码器

@authortangjie

@version1.0



/

packagenewlandframework.netty.rpc.serialize.support.hessian;



importnewlandframework.netty.rpc.serialize.support.MessageCodecUtil;

importnewlandframework.netty.rpc.serialize.support.MessageEncoder;



publicclassHessianEncoderextendsMessageEncoder{



publicHessianEncoder(MessageCodecUtilutil){

super(util);

}

}

复制代码

到目前为止,NettyRPC所针对的Kryo、Hessian序列化协议模块,已经设计实现完毕,现在我们就要把这个协议,嵌入NettyRPC的核心模块包(newlandframework.netty.rpc.core),下面只给出优化调整之后的代码,其它代码模块的内容,可以参考我上一篇的文章:谈谈如何使用Netty开发实现高性能的RPC服务器。好了,我们先来看下,NettyRPC核心模块包(newlandframework.netty.rpc.core)的层次结构:







先来看下,NettyRPC服务端的实现部分。首先是,Rpc服务端管道初始化(MessageRecvChannelInitializer),跟上一版本对比,主要引入了序列化消息对象(RpcSerializeProtocol),具体实现代码如下:



复制代码

/

@filename:MessageRecvChannelInitializer.java



NewlandCo.Ltd.Allrightsreserved.



@Description:Rpc服务端管道初始化

@authortangjie

@version1.0



/

packagenewlandframework.netty.rpc.core;



importio.netty.channel.ChannelInitializer;

importio.netty.channel.ChannelPipeline;

importio.netty.channel.socket.SocketChannel;

importjava.util.Map;

importnewlandframework.netty.rpc.serialize.support.RpcSerializeProtocol;



publicclassMessageRecvChannelInitializerextendsChannelInitializer{



privateRpcSerializeProtocolprotocol;

privateRpcRecvSerializeFrameframe=null;



MessageRecvChannelInitializerbuildRpcSerializeProtocol(RpcSerializeProtocolprotocol){

this.protocol=protocol;

returnthis;

}



MessageRecvChannelInitializer(MaphandlerMap){

frame=newRpcRecvSerializeFrame(handlerMap);

}



protectedvoidinitChannel(SocketChannelsocketChannel)throwsException{

ChannelPipelinepipeline=socketChannel.pipeline();

frame.select(protocol,pipeline);

}

}

复制代码

Rpc服务器执行模块(MessageRecvExecutor)中,默认的序列化采用Java原生本地序列化机制,并且优化了线程池异步调用的层次结构。具体代码如下:



复制代码

/

@filename:MessageRecvExecutor.java



NewlandCo.Ltd.Allrightsreserved.



@Description:Rpc服务器执行模块

@authortangjie

@version1.0



/

packagenewlandframework.netty.rpc.core;



importcom.google.common.util.concurrent.FutureCallback;

importcom.google.common.util.concurrent.Futures;

importcom.google.common.util.concurrent.ListenableFuture;

importcom.google.common.util.concurrent.ListeningExecutorService;

importcom.google.common.util.concurrent.MoreExecutors;

importio.netty.bootstrap.ServerBootstrap;

importio.netty.channel.ChannelFuture;

importio.netty.channel.ChannelFutureListener;

importio.netty.channel.ChannelHandlerContext;

importio.netty.channel.ChannelOption;

importio.netty.channel.EventLoopGroup;

importio.netty.channel.nio.NioEventLoopGroup;

importio.netty.channel.socket.nio.NioServerSocketChannel;

importjava.nio.channels.spi.SelectorProvider;

importjava.util.Iterator;

importjava.util.Map;

importjava.util.Set;

importjava.util.concurrent.Callable;

importjava.util.concurrent.ConcurrentHashMap;

importjava.util.concurrent.ThreadFactory;

importjava.util.concurrent.ThreadPoolExecutor;

importjava.util.logging.Level;

importnewlandframework.netty.rpc.model.MessageKeyVal;

importnewlandframework.netty.rpc.model.MessageRequest;

importnewlandframework.netty.rpc.model.MessageResponse;

importnewlandframework.netty.rpc.serialize.support.RpcSerializeProtocol;

importorg.springframework.beans.BeansException;

importorg.springframework.beans.factory.InitializingBean;

importorg.springframework.context.ApplicationContext;

importorg.springframework.context.ApplicationContextAware;



publicclassMessageRecvExecutorimplementsApplicationContextAware,InitializingBean{



privateStringserverAddress;

//默认JKD本地序列化协议

privateRpcSerializeProtocolserializeProtocol=RpcSerializeProtocol.JDKSERIALIZE;



privatefinalstaticStringDELIMITER=":";



privateMaphandlerMap=newConcurrentHashMap();



privatestaticListeningExecutorServicethreadPoolExecutor;



publicMessageRecvExecutor(StringserverAddress,StringserializeProtocol){

this.serverAddress=serverAddress;

this.serializeProtocol=Enum.valueOf(RpcSerializeProtocol.class,serializeProtocol);

}



publicstaticvoidsubmit(Callabletask,ChannelHandlerContextctx,MessageRequestrequest,MessageResponseresponse){

if(threadPoolExecutor==null){

synchronized(MessageRecvExecutor.class){

if(threadPoolExecutor==null){

threadPoolExecutor=MoreExecutors.listeningDecorator((ThreadPoolExecutor)RpcThreadPool.getExecutor(16,-1));

}

}

}



ListenableFuturelistenableFuture=threadPoolExecutor.submit(task);

//Netty服务端把计算结果异步返回

Futures.addCallback(listenableFuture,newFutureCallback(){

publicvoidonSuccess(Booleanresult){

ctx.writeAndFlush(response).addListener(newChannelFutureListener(){

publicvoidoperationComplete(ChannelFuturechannelFuture)throwsException{

System.out.println("RPCServerSendmessage-idrespone:"+request.getMessageId());

}

});

}



publicvoidonFailure(Throwablet){

t.printStackTrace();

}

},threadPoolExecutor);

}



publicvoidsetApplicationContext(ApplicationContextctx)throwsBeansException{

try{

MessageKeyValkeyVal=(MessageKeyVal)ctx.getBean(Class.forName("newlandframework.netty.rpc.model.MessageKeyVal"));

MaprpcServiceObject=keyVal.getMessageKeyVal();



Sets=rpcServiceObject.entrySet();

Iterator>it=s.iterator();

Map.Entryentry;



while(it.hasNext()){

entry=it.next();

handlerMap.put(entry.getKey(),entry.getValue());

}

}catch(ClassNotFoundExceptionex){

java.util.logging.Logger.getLogger(MessageRecvExecutor.class.getName()).log(Level.SEVERE,null,ex);

}

}



publicvoidafterPropertiesSet()throwsException{

//netty的线程池模型设置成主从线程池模式,这样可以应对高并发请求

//当然netty还支持单线程、多线程网络IO模型,可以根据业务需求灵活配置

ThreadFactorythreadRpcFactory=newNamedThreadFactory("NettyRPCThreadFactory");



//方法返回到Java虚拟机的可用的处理器数量

intparallel=Runtime.getRuntime().availableProcessors()2;



EventLoopGroupboss=newNioEventLoopGroup();

EventLoopGroupworker=newNioEventLoopGroup(parallel,threadRpcFactory,SelectorProvider.provider());



try{

ServerBootstrapbootstrap=newServerBootstrap();

bootstrap.group(boss,worker).channel(NioServerSocketChannel.class)

.childHandler(newMessageRecvChannelInitializer(handlerMap).buildRpcSerializeProtocol(serializeProtocol))

.option(ChannelOption.SO_BACKLOG,128)

.childOption(ChannelOption.SO_KEEPALIVE,true);



String[]ipAddr=serverAddress.split(MessageRecvExecutor.DELIMITER);



if(ipAddr.length==2){

Stringhost=ipAddr[0];

intport=Integer.parseInt(ipAddr[1]);

ChannelFuturefuture=bootstrap.bind(host,port).sync();

System.out.printf("[authortangjie]NettyRPCServerstartsuccess!\nip:%s\nport:%d\nprotocol:%s\n\n",host,port,serializeProtocol);

future.channel().closeFuture().sync();

}else{

System.out.printf("[authortangjie]NettyRPCServerstartfail!\n");

}

}finally{

worker.shutdownGracefully();

boss.shutdownGracefully();

}

}

}

复制代码

Rpc服务器消息处理(MessageRecvHandler)也跟随着调整:



复制代码

/

@filename:MessageRecvHandler.java



NewlandCo.Ltd.Allrightsreserved.



@Description:Rpc服务器消息处理

@authortangjie

@version1.0



/

packagenewlandframework.netty.rpc.core;



importio.netty.channel.ChannelHandlerContext;

importio.netty.channel.ChannelInboundHandlerAdapter;

importjava.util.Map;

importnewlandframework.netty.rpc.model.MessageRequest;

importnewlandframework.netty.rpc.model.MessageResponse;



publicclassMessageRecvHandlerextendsChannelInboundHandlerAdapter{



privatefinalMaphandlerMap;



publicMessageRecvHandler(MaphandlerMap){

this.handlerMap=handlerMap;

}



publicvoidchannelRead(ChannelHandlerContextctx,Objectmsg)throwsException{

MessageRequestrequest=(MessageRequest)msg;

MessageResponseresponse=newMessageResponse();

MessageRecvInitializeTaskrecvTask=newMessageRecvInitializeTask(request,response,handlerMap);

//不要阻塞nio线程,复杂的业务逻辑丢给专门的线程池

MessageRecvExecutor.submit(recvTask,ctx,request,response);

}



publicvoidexceptionCaught(ChannelHandlerContextctx,Throwablecause){

//网络有异常要关闭通道

ctx.close();

}

}

复制代码

Rpc服务器消息线程任务处理(MessageRecvInitializeTask)完成的任务也更加单纯,即根据RPC消息的请求报文,利用反射得到最终的计算结果,并把结果写入RPC应答报文结构。代码如下:



复制代码

/

@filename:MessageRecvInitializeTask.java



NewlandCo.Ltd.Allrightsreserved.



@Description:Rpc服务器消息线程任务处理

@authortangjie

@version1.0



/

packagenewlandframework.netty.rpc.core;



importio.netty.channel.ChannelHandlerContext;

importjava.util.Map;

importjava.util.concurrent.Callable;

importnewlandframework.netty.rpc.model.MessageRequest;

importnewlandframework.netty.rpc.model.MessageResponse;

importorg.apache.commons.lang.reflect.MethodUtils;



publicclassMessageRecvInitializeTaskimplementsCallable{



privateMessageRequestrequest=null;

privateMessageResponseresponse=null;

privateMaphandlerMap=null;

privateChannelHandlerContextctx=null;



publicMessageResponsegetResponse(){

returnresponse;

}



publicMessageRequestgetRequest(){

returnrequest;

}



publicvoidsetRequest(MessageRequestrequest){

this.request=request;

}



MessageRecvInitializeTask(MessageRequestrequest,MessageResponseresponse,MaphandlerMap){

this.request=request;

this.response=response;

this.handlerMap=handlerMap;

this.ctx=ctx;

}



publicBooleancall(){

response.setMessageId(request.getMessageId());

try{

Objectresult=reflect(request);

response.setResult(result);

returnBoolean.TRUE;

}catch(Throwablet){

response.setError(t.toString());

t.printStackTrace();

System.err.printf("RPCServerinvokeerror!\n");

returnBoolean.FALSE;

}

}



privateObjectreflect(MessageRequestrequest)throwsThrowable{

StringclassName=request.getClassName();

ObjectserviceBean=handlerMap.get(className);

StringmethodName=request.getMethodName();

Object[]parameters=request.getParameters();

returnMethodUtils.invokeMethod(serviceBean,methodName,parameters);

}

}

复制代码

刚才说到了,NettyRPC的服务端,可以选择具体的序列化协议,目前是通过硬编码方式实现。后续可以考虑,通过SpringIOC方式,依赖注入。其对应代码如下:



复制代码

/

@filename:RpcRecvSerializeFrame.java



NewlandCo.Ltd.Allrightsreserved.



@Description:RPC服务端消息序列化协议框架

@authortangjie

@version1.0



/

packagenewlandframework.netty.rpc.core;



importio.netty.channel.ChannelPipeline;

importio.netty.handler.codec.LengthFieldBasedFrameDecoder;

importio.netty.handler.codec.LengthFieldPrepender;

importio.netty.handler.codec.serialization.ClassResolvers;

importio.netty.handler.codec.serialization.ObjectDecoder;

importio.netty.handler.codec.serialization.ObjectEncoder;

importjava.util.Map;

importnewlandframework.netty.rpc.serialize.support.MessageCodecUtil;

importnewlandframework.netty.rpc.serialize.support.RpcSerializeFrame;

importnewlandframework.netty.rpc.serialize.support.RpcSerializeProtocol;

importnewlandframework.netty.rpc.serialize.support.hessian.HessianCodecUtil;

importnewlandframework.netty.rpc.serialize.support.hessian.HessianDecoder;

importnewlandframework.netty.rpc.serialize.support.hessian.HessianEncoder;

importnewlandframework.netty.rpc.serialize.support.kryo.KryoCodecUtil;

importnewlandframework.netty.rpc.serialize.support.kryo.KryoDecoder;

importnewlandframework.netty.rpc.serialize.support.kryo.KryoEncoder;

importnewlandframework.netty.rpc.serialize.support.kryo.KryoPoolFactory;



publicclassRpcRecvSerializeFrameimplementsRpcSerializeFrame{



privateMaphandlerMap=null;



publicRpcRecvSerializeFrame(MaphandlerMap){

this.handlerMap=handlerMap;

}



//后续可以优化成通过springioc方式注入

publicvoidselect(RpcSerializeProtocolprotocol,ChannelPipelinepipeline){

switch(protocol){

caseJDKSERIALIZE:{

pipeline.addLast(newLengthFieldBasedFrameDecoder(Integer.MAX_VALUE,0,MessageCodecUtil.MESSAGE_LENGTH,0,MessageCodecUtil.MESSAGE_LENGTH));

pipeline.addLast(newLengthFieldPrepender(MessageCodecUtil.MESSAGE_LENGTH));

pipeline.addLast(newObjectEncoder());

pipeline.addLast(newObjectDecoder(Integer.MAX_VALUE,ClassResolvers.weakCachingConcurrentResolver(this.getClass().getClassLoader())));

pipeline.addLast(newMessageRecvHandler(handlerMap));

break;

}

caseKRYOSERIALIZE:{

KryoCodecUtilutil=newKryoCodecUtil(KryoPoolFactory.getKryoPoolInstance());

pipeline.addLast(newKryoEncoder(util));

pipeline.addLast(newKryoDecoder(util));

pipeline.addLast(newMessageRecvHandler(handlerMap));

break;

}

caseHESSIANSERIALIZE:{

HessianCodecUtilutil=newHessianCodecUtil();

pipeline.addLast(newHessianEncoder(util));

pipeline.addLast(newHessianDecoder(util));

pipeline.addLast(newMessageRecvHandler(handlerMap));

break;

}

}

}

}

复制代码

到目前为止,NettyRPC的服务端的设计实现,已经告一段落。



现在继续实现一下NettyRPC的客户端模块。其中,Rpc客户端管道初始化(MessageSendChannelInitializer)模块调整之后,同样也支持选择具体的消息序列化协议(RpcSerializeProtocol)。代码如下:



复制代码

/

@filename:MessageSendChannelInitializer.java



NewlandCo.Ltd.Allrightsreserved.



@Description:Rpc客户端管道初始化

@authortangjie

@version1.0



/

packagenewlandframework.netty.rpc.core;



importio.netty.channel.ChannelInitializer;

importio.netty.channel.ChannelPipeline;

importio.netty.channel.socket.SocketChannel;

importnewlandframework.netty.rpc.serialize.support.RpcSerializeProtocol;



publicclassMessageSendChannelInitializerextendsChannelInitializer{



privateRpcSerializeProtocolprotocol;

privateRpcSendSerializeFrameframe=newRpcSendSerializeFrame();



MessageSendChannelInitializerbuildRpcSerializeProtocol(RpcSerializeProtocolprotocol){

this.protocol=protocol;

returnthis;

}



protectedvoidinitChannel(SocketChannelsocketChannel)throwsException{

ChannelPipelinepipeline=socketChannel.pipeline();

frame.select(protocol,pipeline);

}

}

复制代码

Rpc客户端执行模块(MessageSendExecutor)代码实现如下:



复制代码

/

@filename:MessageSendExecutor.java



NewlandCo.Ltd.Allrightsreserved.



@Description:Rpc客户端执行模块

@authortangjie

@version1.0



/

packagenewlandframework.netty.rpc.core;



importcom.google.common.reflect.Reflection;

importnewlandframework.netty.rpc.serialize.support.RpcSerializeProtocol;



publicclassMessageSendExecutor{



privateRpcServerLoaderloader=RpcServerLoader.getInstance();



publicMessageSendExecutor(){

}



publicMessageSendExecutor(StringserverAddress,RpcSerializeProtocolserializeProtocol){

loader.load(serverAddress,serializeProtocol);

}



publicvoidsetRpcServerLoader(StringserverAddress,RpcSerializeProtocolserializeProtocol){

loader.load(serverAddress,serializeProtocol);

}



publicvoidstop(){

loader.unLoad();

}



publicstaticTexecute(ClassrpcInterface){

return(T)Reflection.newProxy(rpcInterface,newMessageSendProxy());

}

}

复制代码

Rpc客户端线程任务处理(MessageSendInitializeTask),其中参数增加了协议类型(RpcSerializeProtocol),具体代码如下:



复制代码

/

@filename:MessageSendInitializeTask.java



NewlandCo.Ltd.Allrightsreserved.



@Description:Rpc客户端线程任务处理

@authortangjie

@version1.0



/

packagenewlandframework.netty.rpc.core;



importio.netty.bootstrap.Bootstrap;

importio.netty.channel.ChannelFuture;

importio.netty.channel.ChannelFutureListener;

importio.netty.channel.ChannelOption;

importio.netty.channel.EventLoopGroup;

importio.netty.channel.socket.nio.NioSocketChannel;

importjava.net.InetSocketAddress;

importjava.util.concurrent.Callable;

importnewlandframework.netty.rpc.serialize.support.RpcSerializeProtocol;



publicclassMessageSendInitializeTaskimplementsCallable{



privateEventLoopGroupeventLoopGroup=null;

privateInetSocketAddressserverAddress=null;

privateRpcSerializeProtocolprotocol;



MessageSendInitializeTask(EventLoopGroupeventLoopGroup,InetSocketAddressserverAddress,RpcSerializeProtocolprotocol){

this.eventLoopGroup=eventLoopGroup;

this.serverAddress=serverAddress;

this.protocol=protocol;

}



publicBooleancall(){

Bootstrapb=newBootstrap();

b.group(eventLoopGroup)

.channel(NioSocketChannel.class).option(ChannelOption.SO_KEEPALIVE,true);

b.handler(newMessageSendChannelInitializer().buildRpcSerializeProtocol(protocol));



ChannelFuturechannelFuture=b.connect(serverAddress);

channelFuture.addListener(newChannelFutureListener(){

publicvoidoperationComplete(finalChannelFuturechannelFuture)throwsException{

if(channelFuture.isSuccess()){

MessageSendHandlerhandler=channelFuture.channel().pipeline().get(MessageSendHandler.class);

RpcServerLoader.getInstance().setMessageSendHandler(handler);

}

}

});

returnBoolean.TRUE;

}

}

复制代码

Rpc客户端消息处理(MessageSendProxy)的实现方式调整重构之后,如下所示:



复制代码

/

@filename:MessageSendProxy.java



NewlandCo.Ltd.Allrightsreserved.



@Description:Rpc客户端消息处理

@authortangjie

@version1.0



/

packagenewlandframework.netty.rpc.core;



importjava.lang.reflect.Method;

importjava.util.UUID;

importnewlandframework.netty.rpc.model.MessageRequest;

importcom.google.common.reflect.AbstractInvocationHandler;



publicclassMessageSendProxyextendsAbstractInvocationHandler{



publicObjecthandleInvocation(Objectproxy,Methodmethod,Object[]args)throwsThrowable{

MessageRequestrequest=newMessageRequest();

request.setMessageId(UUID.randomUUID().toString());

request.setClassName(method.getDeclaringClass().getName());

request.setMethodName(method.getName());

request.setTypeParameters(method.getParameterTypes());

request.setParameters(args);



MessageSendHandlerhandler=RpcServerLoader.getInstance().getMessageSendHandler();

MessageCallBackcallBack=handler.sendRequest(request);

returncallBack.start();

}

}

复制代码

同样,NettyRPC的客户端也是可以选择协议类型的,必须注意的是,NettyRPC的客户端和服务端的协议类型必须一致,才能互相通信。NettyRPC的客户端消息序列化协议框架代码实现方式如下:



复制代码

/

@filename:RpcSendSerializeFrame.java



NewlandCo.Ltd.Allrightsreserved.



@Description:RPC客户端消息序列化协议框架

@authortangjie

@version1.0



/

packagenewlandframework.netty.rpc.core;



importio.netty.channel.ChannelPipeline;

importio.netty.handler.codec.LengthFieldBasedFrameDecoder;

importio.netty.handler.codec.LengthFieldPrepender;

importio.netty.handler.codec.serialization.ClassResolvers;

importio.netty.handler.codec.serialization.ObjectDecoder;

importio.netty.handler.codec.serialization.ObjectEncoder;

importnewlandframework.netty.rpc.serialize.support.MessageCodecUtil;

importnewlandframework.netty.rpc.serialize.support.hessian.HessianCodecUtil;

importnewlandframework.netty.rpc.serialize.support.hessian.HessianDecoder;

importnewlandframework.netty.rpc.serialize.support.hessian.HessianEncoder;

importnewlandframework.netty.rpc.serialize.support.kryo.KryoCodecUtil;

importnewlandframework.netty.rpc.serialize.support.kryo.KryoDecoder;

importnewlandframework.netty.rpc.serialize.support.kryo.KryoEncoder;

importnewlandframework.netty.rpc.serialize.support.kryo.KryoPoolFactory;

importnewlandframework.netty.rpc.serialize.support.RpcSerializeFrame;

importnewlandframework.netty.rpc.serialize.support.RpcSerializeProtocol;



publicclassRpcSendSerializeFrameimplementsRpcSerializeFrame{



//后续可以优化成通过springioc方式注入

publicvoidselect(RpcSerializeProtocolprotocol,ChannelPipelinepipeline){

switch(protocol){

caseJDKSERIALIZE:{

pipeline.addLast(newLengthFieldBasedFrameDecoder(Integer.MAX_VALUE,0,MessageCodecUtil.MESSAGE_LENGTH,0,MessageCodecUtil.MESSAGE_LENGTH));

pipeline.addLast(newLengthFieldPrepender(MessageCodecUtil.MESSAGE_LENGTH));

pipeline.addLast(newObjectEncoder());

pipeline.addLast(newObjectDecoder(Integer.MAX_VALUE,ClassResolvers.weakCachingConcurrentResolver(this.getClass().getClassLoader())));

pipeline.addLast(newMessageSendHandler());

break;

}

caseKRYOSERIALIZE:{

KryoCodecUtilutil=newKryoCodecUtil(KryoPoolFactory.getKryoPoolInstance());

pipeline.addLast(newKryoEncoder(util));

pipeline.addLast(newKryoDecoder(util));

pipeline.addLast(newMessageSendHandler());

break;

}

caseHESSIANSERIALIZE:{

HessianCodecUtilutil=newHessianCodecUtil();

pipeline.addLast(newHessianEncoder(util));

pipeline.addLast(newHessianDecoder(util));

pipeline.addLast(newMessageSendHandler());

break;

}

}

}

}

复制代码

最后,NettyRPC客户端,要加载NettyRPC服务端的一些上下文(Context)信息。因此,RPC服务器配置加载(RpcServerLoader)的代码重构调整如下:



复制代码

/

@filename:RpcServerLoader.java



NewlandCo.Ltd.Allrightsreserved.



@Description:rpc服务器配置加载

@authortangjie

@version1.0



/

packagenewlandframework.netty.rpc.core;



importcom.google.common.util.concurrent.FutureCallback;

importcom.google.common.util.concurrent.Futures;

importcom.google.common.util.concurrent.ListenableFuture;

importcom.google.common.util.concurrent.ListeningExecutorService;

importcom.google.common.util.concurrent.MoreExecutors;

importio.netty.channel.EventLoopGroup;

importio.netty.channel.nio.NioEventLoopGroup;

importjava.net.InetSocketAddress;

importjava.util.concurrent.ThreadPoolExecutor;

importjava.util.concurrent.locks.Condition;

importjava.util.concurrent.locks.Lock;

importjava.util.concurrent.locks.ReentrantLock;

importjava.util.logging.Level;

importjava.util.logging.Logger;

importnewlandframework.netty.rpc.serialize.support.RpcSerializeProtocol;



publicclassRpcServerLoader{



privatevolatilestaticRpcServerLoaderrpcServerLoader;

privatefinalstaticStringDELIMITER=":";

//默认采用Java原生序列化协议方式传输RPC消息

privateRpcSerializeProtocolserializeProtocol=RpcSerializeProtocol.JDKSERIALIZE;



//方法返回到Java虚拟机的可用的处理器数量

privatefinalstaticintparallel=Runtime.getRuntime().availableProcessors()2;

//nettynio线程池

privateEventLoopGroupeventLoopGroup=newNioEventLoopGroup(parallel);

privatestaticListeningExecutorServicethreadPoolExecutor=MoreExecutors.listeningDecorator((ThreadPoolExecutor)RpcThreadPool.getExecutor(16,-1));

privateMessageSendHandlermessageSendHandler=null;



//等待Netty服务端链路建立通知信号

privateLocklock=newReentrantLock();

privateConditionconnectStatus=lock.newCondition();

privateConditionhandlerStatus=lock.newCondition();



privateRpcServerLoader(){

}



//并发双重锁定

publicstaticRpcServerLoadergetInstance(){

if(rpcServerLoader==null){

synchronized(RpcServerLoader.class){

if(rpcServerLoader==null){

rpcServerLoader=newRpcServerLoader();

}

}

}

returnrpcServerLoader;

}



publicvoidload(StringserverAddress,RpcSerializeProtocolserializeProtocol){

String[]ipAddr=serverAddress.split(RpcServerLoader.DELIMITER);

if(ipAddr.length==2){

Stringhost=ipAddr[0];

intport=Integer.parseInt(ipAddr[1]);

finalInetSocketAddressremoteAddr=newInetSocketAddress(host,port);



ListenableFuturelistenableFuture=threadPoolExecutor.submit(newMessageSendInitializeTask(eventLoopGroup,remoteAddr,serializeProtocol));



//监听线程池异步的执行结果成功与否再决定是否唤醒全部的客户端RPC线程

Futures.addCallback(listenableFuture,newFutureCallback(){

publicvoidonSuccess(Booleanresult){

try{

lock.lock();



if(messageSendHandler==null){

handlerStatus.await();

}



//Futures异步回调,唤醒所有rpc等待线程

if(result==Boolean.TRUE&&messageSendHandler!=null){

connectStatus.signalAll();

}

}catch(InterruptedExceptionex){

Logger.getLogger(RpcServerLoader.class.getName()).log(Level.SEVERE,null,ex);

}finally{

lock.unlock();

}

}



publicvoidonFailure(Throwablet){

t.printStackTrace();

}

},threadPoolExecutor);

}

}



publicvoidsetMessageSendHandler(MessageSendHandlermessageInHandler){

try{

lock.lock();

this.messageSendHandler=messageInHandler;

handlerStatus.signal();

}finally{

lock.unlock();

}

}



publicMessageSendHandlergetMessageSendHandler()throwsInterruptedException{

try{

lock.lock();

//Netty服务端链路没有建立完毕之前,先挂起等待

if(messageSendHandler==null){

connectStatus.await();

}

returnmessageSendHandler;

}finally{

lock.unlock();

}

}



publicvoidunLoad(){

messageSendHandler.close();

threadPoolExecutor.shutdown();

eventLoopGroup.shutdownGracefully();

}



publicvoidsetSerializeProtocol(RpcSerializeProtocolserializeProtocol){

this.serializeProtocol=serializeProtocol;

}

}

复制代码

到目前为止,NettyRPC的主要核心模块的代码,全部呈现出来了。到底经过改良重构之后,NettyRPC服务器的性能如何?还是那句话,实践是检验真理的唯一标准。现在,我们就来启动三台NettyRPC服务器进行验证。具体服务端的配置参数,参考如下:



1、Java原生本地序列化NettyRPC服务器,对应IP为:127.0.0.1:18887。



2、Kryo序列化NettyRPC服务器,对应IP为:127.0.0.1:18888。



3、Hessian序列化NettyRPC服务器,对应IP为:127.0.0.1:18889。



具体的Spring配置文件结构如下所示:







参数配置的内容如下:



rpc-server-jdknative.properties



#rpcserver''sipaddressconfig

rpc.server.addr=127.0.0.1:18887

rpc-server-kryo.properties



#rpcserver''sipaddressconfig

rpc.server.addr=127.0.0.1:18888

rpc-server-hessian.properties



#rpcserver''sipaddressconfig

rpc.server.addr=127.0.0.1:18889

rpc-invoke-config-jdknative.xml



复制代码




xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

xmlns:context="http://www.springframework.org/schema/context"

xsi:schemaLocation="http://www.springframework.org/schema/beans

http://www.springframework.org/schema/beans/spring-beans.xsd

http://www.springframework.org/schema/context

http://www.springframework.org/schema/context/spring-context.xsd">



































复制代码

rpc-invoke-config-kryo.xml



复制代码




xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

xmlns:context="http://www.springframework.org/schema/context"

xsi:schemaLocation="http://www.springframework.org/schema/beans

http://www.springframework.org/schema/beans/spring-beans.xsd

http://www.springframework.www.wang027.comorg/schema/context

http://www.springframework.org/schema/context/spring-context.xsd">



































复制代码

rpc-invoke-config-hessian.xml



复制代码




xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

xmlns:context="http://www.springframework.org/schema/context"

xsi:schemaLocation="http://www.springframework.org/schema/beans

http://www.springframework.org/schema/beans/spring-beans.xsd

http://www.springframework.org/schema/context

http://www.springframework.org/schema/context/spring-context.xsd">



































复制代码

然后,对应的NettRPC服务器启动方式参考如下:



newClassPathXmlApplicationContext("newlandframework/netty/rpc/config/rpc-invoke-config-jdknative.xml");



newClassPathXmlApplicationContext("newlandframework/netty/rpc/config/rpc-invoke-config-kryo.xml");



newClassPathXmlApplicationContext("newlandframework/netty/rpc/config/rpc-invoke-config-hessian.xml");

如果一切顺利的话,在控制台上,会打印出支持Java原生序列化、Kryo序列化、Hessian序列化的NettyRPC服务器的启动信息,具体截图如下:



首先是Java原生序列化NettyRPC启动成功截图:







然后是Kryo序列化NettyRPC启动成功截图:







最后是Hessian序列化NettyRPC启动成功截图:







现在,还是跟我上一篇文章用到的并发测试用例一样,设计构造一个,瞬时值并行度1W的求和计算RPC请求,总共请求10笔,然后观察每一笔具体协议(Java原生序列化、Kryo、Hessian)的RPC消息编码、解码消耗时长(毫秒)。



测试代码如下所示:



复制代码

/

@filename:RpcParallelTest.java



NewlandCo.Ltd.Allrightsreserved.



@Description:rpc并发测试代码

@authortangjie

@version1.0



/

packagenewlandframework.netty.rpc.servicebean;



importjava.util.concurrent.CountDownLatch;

importjava.util.concurrent.TimeUnit;

importnewlandframework.netty.rpc.core.MessageSendExecutor;

importnewlandframework.netty.rpc.serialize.support.RpcSerializeProtocol;

importorg.apache.commons.lang.time.StopWatch;



publicclassRpcParallelTest{



publicstaticvoidparallelTask(MessageSendExecutorexecutor,intparallel,StringserverAddress,RpcSerializeProtocolprotocol)throwsInterruptedException{

//开始计时

StopWatchsw=newStopWatch();

sw.start();



CountDownLatchsignal=newCountDownLatch(1);

CountDownLatchfinish=newCountDownLatch(parallel);



for(intindex=0;index
CalcParallelRequestThreadclient=newCalcParallelRequestThread(executor,signal,finish,index);

newThread(client).start();

}



//10000个并发线程瞬间发起请求操作

signal.countDown();

finish.await();

sw.stop();



Stringtip=String.format("[%s]RPC调用总共耗时:[%s]毫秒",protocol,sw.getTime());

System.out.println(tip);



}



//JDK本地序列化协议

publicstaticvoidJdkNativeParallelTask(MessageSendExecutorexecutor,intparallel)throwsInterruptedException{

StringserverAddress="127.0.0.1:18887";

RpcSerializeProtocolprotocol=RpcSerializeProtocol.JDKSERIALIZE;

executor.setRpcServerLoader(serverAddress,protocol);

RpcParallelTest.parallelTask(executor,parallel,serverAddress,protocol);

TimeUnit.SECONDS.sleep(3);

}



//Kryo序列化协议

publicstaticvoidKryoParallelTask(MessageSendExecutorexecutor,intparallel)throwsInterruptedException{

StringserverAddress="127.0.0.1:18888";

RpcSerializeProtocolprotocol=RpcSerializeProtocol.KRYOSERIALIZE;

executor.setRpcServerLoader(serverAddress,protocol);

RpcParallelTest.parallelTask(executor,parallel,serverAddress,protocol);

TimeUnit.SECONDS.sleep(3);

}



//Hessian序列化协议

publicstaticvoidHessianParallelTask(MessageSendExecutorexecutor,intparallel)throwsInterruptedException{

StringserverAddress="127.0.0.1:18889";

RpcSerializeProtocolprotocol=RpcSerializeProtocol.HESSIANSERIALIZE;

executor.setRpcServerLwww.baiyuewang.netoader(serverAddress,protocol);

RpcParallelTest.parallelTask(executor,parallel,serverAddress,protocol);

TimeUnit.SECONDS.sleep(3);

}



publicstaticvoidmain(String[]args)throwsException{

//并行度10000

intparallel=10000;

MessageSendExecutorexecutor=newMessageSendExecutor();



for(inti=0;i<10;i++){

JdkNativeParallelTask(executor,parallel);

KryoParallelTask(executor,parallel);

HessianParallelTask(executor,parallel);

System.out.printf("[authortangjie]NettyRPCServer消息协议序列化第[%d]轮并发验证结束!\n\n",i);

}



executor.stop();

}

}

复制代码

运行截图如下:







现在,我就收集汇总一下测试数据,分析对比一下,每一种协议对RPC消息序列化/反序列化的性能(注意:由于每台计算机的配置差异,下面的测试结论可能存在出入,本次测试结果仅仅是学习交流之用!)。



经过10轮的压力测试,具体的数据如下所示:







可以很明显的发现,经过上述代码框架优化调整之后,Java原生本地序列化的处理性能,跟之前博客文章中设计实现处理性能上对比,运行效率有较大的提升(RPC消息序列化/反序列耗时更少)。Java本地序列化、Kryo序列化、Hessian序列化在10次的压力测试中,分别有1次耗时大于10S(秒)的操作。经过统计分析之后,结果如下图:







Kryo序列化、Hessian序列化的性能不分伯仲,并且总体优于Java本地序列化的性能水平。



再来看下,10轮压力测试中,Java本地序列化、Kryo序列化、Hessian序列化的耗时波动情况,如下图所示:







可以很清楚的发现,三种序列化方式分别有个“拐点”,除开这个“拐点”,三种序列化方式耗时相对来说比较平稳。但是总体而言,Kryo、Hessian序列化耗时有适当的波动,震荡相对比较明显;而Java原生序列化耗时相对来说比较平稳,没有出现频繁的震荡,但是耗时较长。



写在最后:本文是前一篇文章“谈谈如何使用Netty开发实现高性能的RPC服务器”的性能优化篇,主要从RPC消息序列化机制、对象池(ObjectPooling)、多线程优化等角度,对之前设计实现的基于Netty的RPC服务器框架进行优化重构。当然目前的RPC服务器,还仅仅处于“各自为政”的状态,能不能把集群中的若干台RPC服务器,通过某种机制进行统一的分布式协调管理、以及服务调度呢?答案是肯定的,一种可行的方案就是引入Zookeeper,进行服务治理。后续有时间,我会继续加以优化改良,到时再以博客的形式,呈现给大家!由于本人的认知水平、技术能力的限制,本文中涉及的技术观点、测试数据、测试结论等等,仅限于博客园中园友们的学习交流之用。如果本人有说得不对的地方,欢迎各位园友批评指正!



洋洋洒洒地写了这么多,感谢您的耐心阅读。相信读完本篇文章,面前的您,对于利用Java开发高性能的服务端应用,又多了一份了解和自信。路漫漫其修远兮,吾将上下而求索。对于软件知识的求学探索之路没有止境,谨以此话和大家共勉之!

献花(0)
+1
(本文系thedust79首藏)