配色: 字号:
如何使用Netty开发实现高性能的RPC服务器
2016-09-07 | 阅:  转:  |  分享 
  
如何使用Netty开发实现高性能的RPC服务器



RPC(RemoteProcedureCallProtocol)远程过程调用协议,它是一种通过网络,从远程计算机程序上请求服务,而不必了解底层网络技术的协议。说的再直白一点,就是客户端在不必知道调用细节的前提之下,调用远程计算机上运行的某个对象,使用起来就像调用本地的对象一样。目前典型的RPC实现框架有:Thrift(facebook开源)、Dubbo(alibaba开源)等等。RPC框架针对网络协议、网络I/O模型的封装是透明的,对于调用的客户端而言,它就认为自己在调用本地的一个对象。至于传输层上,运用的是TCP协议、UDP协议、亦或是HTTP协议,一概不关心。从网络I/O模型上来看,是基于select、poll、epoll方式、还是IOCP(I/OCompletionPort)方式承载实现的,对于调用者而言也不用关心。



目前,主流的RPC框架都支持跨语言调用,即有所谓的IDL(接口定义语言),其实,这个并不是RPC所必须要求的。如果你的RPC框架没有跨语言的要求,IDL就可以不用包括了。



最后,值得一提的是,衡量一个RPC框架性能的好坏与否,RPC的网络I/O模型的选择,至关重要。在此基础上,设计出来的RPC服务器,可以考虑支持阻塞式同步IO、非阻塞式同步IO、当然还有所谓的多路复用IO模型、异步IO模型。支持不同的网络IO模型,在高并发的状态下,处理性能上会有很大的差别。还有一个衡量的标准,就是选择的传输协议。是基于TCP协议、还是HTTP协议、还是UDP协议?对性能也有一定的影响。但是从我目前了解的情况来看,大多数RPC开源实现框架都是基于TCP、或者HTTP的,目测没有采用UDP协议做为主要的传输协议的。



明白了RPC的使用原理和性能要求。现在,我们能不能撇开那些RPC开源框架,自己动手开发一个高性能的RPC服务器呢?我想,还是可以的。现在本人就使用Java,基于Netty,开发实现一个高性能的RPC服务器。



如何实现、基于什么原理?并发处理性能如何?请继续接着看下文。



我们有的时候,为了提高单个节点的通信吞吐量,提高通信性能。如果是基于Java后端的,一般首选的是NIO框架(No-blockIO)。但是问题也来了,Java的NIO掌握起来要相当的技术功底,和足够的技术积累,使用起来才能得心应手。一般的开发人员,如果要使用NIO开发一个后端的TCP/HTTP服务器,附带考虑TCP粘包、网络通信异常、消息链接处理等等网络通信细节,开发门槛太高,所以比较明智的选择是,采用业界主流的NIO框架进行服务器后端开发。主流的NIO框架主要有Netty、Mina。它们主要都是基于TCP通信,非阻塞的IO、灵活的IO线程池而设计的,应对高并发请求也是绰绰有余。随着Netty、Mina这样优秀的NIO框架,设计上日趋完善,Java后端高性能服务器开发,在技术上提供了有力的支持保障,从而打破了C++在服务器后端,一统天下的局面。因为在此之前,Java的NIO一直受人诟病,让人敬而远之!



既然,这个RPC服务器是基于Netty的,那就在说说Netty吧。实际上Netty是对JAVANIO框架的再次封装,它的开源网址是http://netty.io/,本文中使用的Netty版本是:4.0版本,可以通过http://dl.bintray.com/netty/downloads/netty-4.0.37.Final.tar.bz2,进行下载使用。那也许你会问,如何使用Netty进行RPC服务器的开发呢?实际不难,下面我就简单的说明一下技术原理:



1、定义RPC请求消息、应答消息结构,里面要包括RPC的接口定义模块、包括远程调用的类名、方法名称、参数结构、参数值等信息。



2、服务端初始化的时候通过容器加载RPC接口定义和RPC接口实现类对象的映射关系,然后等待客户端发起调用请求。



3、客户端发起的RPC消息里面包含,远程调用的类名、方法名称、参数结构、参数值等信息,通过网络,以字节流的方式送给RPC服务端,RPC服务端接收到字节流的请求之后,去对应的容器里面,查找客户端接口映射的具体实现对象。



4、RPC服务端找到实现对象的参数信息,通过反射机制创建该对象的实例,并返回调用处理结果,最后封装成RPC应答消息通知到客户端。



5、客户端通过网络,收到字节流形式的RPC应答消息,进行拆包、解析之后,显示远程调用结果。



上面说的是很简单,但是实现的时候,我们还要考虑如下的问题:



1、RPC服务器的传输层是基于TCP协议的,出现粘包咋办?这样客户端的请求,服务端不是会解析失败?好在Netty里面已经提供了解决TCP粘包问题的解码器:LengthFieldBasedFrameDecoder,可以靠它轻松搞定TCP粘包问题。



2、Netty服务端的线程模型是单线程、多线程(一个线程负责客户端连接,连接成功之后,丢给后端IO的线程池处理)、还是主从模式(客户端连接、后端IO处理都是基于线程池的实现)。当然在这里,我出于性能考虑,使用了Netty主从线程池模型。



3、Netty的IO处理线程池,如果遇到非常耗时的业务,出现阻塞了咋办?这样不是很容易把后端的NIO线程给挂死、阻塞?本文的处理方式是,对于复杂的后端业务,分派到专门的业务线程池里面,进行异步回调处理。



4、RPC消息的传输是通过字节流在NIO的通道(Channel)之间传输,那具体如何实现呢?本文,是通过基于Java原生对象序列化机制的编码、解码器(ObjectEncoder、ObjectDecoder)进行实现的。当然出于性能考虑,这个可能不是最优的方案。更优的方案是把消息的编码、解码器,搞成可以配置实现的。具体比如可以通过:protobuf、JBossMarshalling方式进行解码和编码,以提高网络消息的传输效率。



5、RPC服务器要考虑多线程、高并发的使用场景,所以线程安全是必须的。此外尽量不要使用synchronized进行加锁,改用轻量级的ReentrantLock方式进行代码块的条件加锁。比如本文中的RPC消息处理回调,就有这方面的使用。



6、RPC服务端的服务接口对象和服务接口实现对象要能轻易的配置,轻松进行加载、卸载。在这里,本文是通过Spring容器进行统一的对象管理。



客户端并发发起RPC调用请求,然后RPC服务端使用Netty连接器,分派出N个NIO连接线程,这个时候Netty连接器的任务结束。然后NIO连接线程是统一放到NettyNIO处理线程池进行管理,这个线程池里面会对具体的RPC请求连接进行消息编码、消息解码、消息处理等等一系列操作。最后进行消息处理(Handler)的时候,处于性能考虑,这里的设计是,直接把复杂的消息处理过程,丢给专门的RPC业务处理线程池集中处理,然后Handler对应的NIO线程就立即返回、不会阻塞。这个时候RPC调用结束,客户端会异步等待服务端消息的处理结果,本文是通过消息回调机制实现(MessageCallBack)。



其中newlandframework.netty.rpc.core包是NettyRPC的核心实现。newlandframework.netty.rpc.model包里面,则封装了RPC消息请求、应答报文结构,以及RPC服务接口与实现绑定关系的容器定义。newlandframework.netty.rpc.config里面定义了NettyRPC的服务端文件配置属性。



下面先来看下newlandframework.netty.rpc.model包中定义的内容。

复制代码

/

@filename:MessageRequest.java



NewlandCo.Ltd.Allrightsreserved.



@Description:rpc服务请求结构

@authortangjie

@version1.0



/

packagenewlandframework.netty.rpc.model;



importjava.io.Serializable;

importorg.apache.commons.lang.builder.ToStringBuilder;

importorg.apache.commons.lang.builder.ToStringStyle;



publicclassMessageRequestimplementsSerializable{



privateStringmessageId;

privateStringclassName;

privateStringmethodName;

privateClass[]typeParameters;

privateObject[]parametersVal;



publicStringgetMessageId(){

returnmessageId;

}



publicvoidsetMessageId(StringmessageId){

this.messageId=messageId;

}



publicStringgetClassName(){

returnclassName;

}



publicvoidsetClassName(StringclassName){

this.className=className;

}



publicStringgetMethodName(){

returnmethodName;

}



publicvoidsetMethodName(StringmethodName){

this.methodName=methodName;

}



publicClass[]getTypeParameters(){

returntypeParameters;

}



publicvoidsetTypeParameters(Class[]typeParameters){

this.typeParameters=typeParameters;

}



publicObject[]getParameters(){

returnparametersVal;

}



publicvoidsetParameters(Object[]parametersVal){

this.parametersVal=parametersVal;

}



publicStringtoString(){

returnnewToStringBuilder(this,ToStringStyle.SHORT_PREFIX_STYLE)

.append("messageId",messageId).append("className",className)

.append("methodName",methodName).toString();

}

}

复制代码

RPC应答消息结构



复制代码

/

@filename:MessageResponse.java



NewlandCo.Ltd.Allrightsreserved.



@Description:rpc服务应答结构

@authortangjie

@version1.0



/

packagenewlandframework.netty.rpc.model;



importjava.io.Serializable;

importorg.apache.commons.lang.builder.ToStringBuilder;

importorg.apache.commons.lang.builder.ToStringStyle;



publicclassMessageResponseimplementsSerializable{



privateStringmessageId;

privateStringerror;

privateObjectresultDesc;



publicStringgetMessageId(){

returnmessageId;

}



publicvoidsetMessageId(StringmessageId){

this.messageId=messageId;

}



publicStringgetError(){

returnerror;

}



publicvoidsetError(Stringerror){

this.error=error;

}



publicObjectgetResult(){

returnresultDesc;

}



publicvoidsetResult(ObjectresultDesc){

this.resultDesc=resultDesc;

}



publicStringtoString(){

returnnewToStringBuilder(this,ToStringStyle.SHORT_PREFIX_STYLE)

.append("messageId",messageId).append("error",error).toString();

}

}

复制代码

RPC服务接口定义、服务接口实现绑定关系容器定义,提供给spring作为容器使用。



复制代码

/

@filename:MessageKeyVal.java



NewlandCo.Ltd.Allrightsreserved.



@Description:rpc服务映射容器

@authortangjie

@version1.0



/

packagenewlandframework.netty.rpc.model;



importjava.util.Map;



publicclassMessageKeyVal{



privateMapmessageKeyVal;



publicvoidsetMessageKeyVal(MapmessageKeyVal){

this.messageKeyVal=messageKeyVal;

}



publicMapgetMessageKeyVal(){

returnmessageKeyVal;

}

}

复制代码

好了,定义好核心模型结构之后,现在再向大家展示一下NettyRPC核心包:newlandframework.netty.rpc.core的关键部分实现代码,首先是业务线程池相关类的实现代码,具体如下:



线程工厂定义实现



复制代码

/

@filename:NamedThreadFactory.java



NewlandCo.Ltd.Allrightsreserved.



@Description:线程工厂

@authortangjie

@version1.0



/

packagenewlandframework.netty.rpc.core;



importjava.util.concurrent.ThreadFactory;

importjava.util.concurrent.atomic.AtomicInteger;



publicclassNamedThreadFactoryimplementsThreadFactory{



privatestaticfinalAtomicIntegerthreadNumber=newAtomicInteger(1);



privatefinalAtomicIntegermThreadNum=newAtomicInteger(1);



privatefinalStringprefix;



privatefinalbooleandaemoThread;



privatefinalThreadGroupthreadGroup;



publicNamedThreadFactory(){

this("rpcserver-threadpool-"+threadNumber.getAndIncrement(),false);

}



publicNamedThreadFactory(Stringprefix){

this(prefix,false);

}



publicNamedThreadFactory(Stringprefix,booleandaemo){

this.prefix=prefix+"-thread-";

daemoThread=daemo;

SecurityManagers=System.getSecurityManager();

threadGroup=(s==null)?Thread.currentThread().getThreadGroup():s.getThreadGroup();

}



publicThreadnewThread(Runnablerunnable){

Stringname=prefix+mThreadNum.getAndIncrement();

Threadret=newThread(threadGroup,runnable,name,0);

ret.setDaemon(daemoThread);

returnret;

}



publicThreadGroupgetThreadGroup(){

returnthreadGroup;

}

}

复制代码

业务线程池定义实现



复制代码

/

@filename:RpcThreadPool.java



NewlandCo.Ltd.Allrightsreserved.



@Description:rpc线程池封装

@authortangjie

@version1.0



/

packagenewlandframework.netty.rpc.core;



importjava.util.concurrent.Executor;

importjava.util.concurrent.LinkedBlockingQueue;

importjava.util.concurrent.SynchronousQueue;

importjava.util.concurrent.ThreadPoolExecutor;

importjava.util.concurrent.TimeUnit;



publicclassRpcThreadPool{



//独立出线程池主要是为了应对复杂耗I/O操作的业务,不阻塞netty的handler线程而引入

//当然如果业务足够简单,把处理逻辑写入netty的handler(ChannelInboundHandlerAdapter)也未尝不可

publicstaticExecutorgetExecutor(intthreads,intqueues){

Stringname="RpcThreadPool";

returnnewThreadPoolExecutor(threads,threads,0,TimeUnit.MILLISECONDS,

queues==0?newSynchronousQueue()

:(queues<0?newLinkedBlockingQueue()

:newLinkedBlockingQueue(queues)),

newNamedThreadFactory(name,true),newAbortPolicyWithReport(name));

}

}

复制代码

复制代码

/

@filename:AbortPolicyWithReport.java



NewlandCo.Ltd.Allrightsreserved.



@Description:线程池异常策略

@authortangjie

@version1.0



/

packagenewlandframework.netty.rpc.core;



importjava.util.concurrent.RejectedExecutionException;

importjava.util.concurrent.ThreadPoolExecutor;



publicclassAbortPolicyWithReportextendsThreadPoolExecutor.AbortPolicy{



privatefinalStringthreadName;



publicAbortPolicyWithReport(StringthreadName){

this.threadName=threadName;

}



publicvoidrejectedExecution(Runnabler,ThreadPoolExecutore){

Stringmsg=String.format("RpcServer["

+"ThreadName:%s,PoolSize:%d(active:%d,core:%d,max:%d,largest:%d),Task:%d(completed:%d),"

+"Executorstatus:(isShutdown:%s,isTerminated:%s,isTerminating:%s)]",

threadName,e.getPoolSize(),e.getActiveCount(),e.getCorePoolSize(),e.getMaximumPoolSize(),e.getLargestPoolSize(),

e.getTaskCount(),e.getCompletedTaskCount(),e.isShutdown(),e.isTerminated(),e.isTerminating());

System.out.println(msg);

thrownewRejectedExecutionException(msg);

}

}

复制代码

RPC调用客户端定义实现



复制代码

/

@filename:MessageSendExecutor.java



NewlandCo.Ltd.Allrightsreserved.



@Description:Rpc客户端执行模块

@authortangjie

@version1.0



/

packagenewlandframework.netty.rpc.core;



importjava.lang.reflect.Proxy;



publicclassMessageSendExecutor{



privateRpcServerLoaderloader=RpcServerLoader.getInstance();



publicMessageSendExecutor(StringserverAddress){

loader.load(serverAddress);

}



publicvoidstop(){

loader.unLoad();

}



publicstaticTexecute(ClassrpcInterface){

return(T)Proxy.newProxyInstance(

rpcInterface.getClassLoader(),

newClass[]{rpcInterface},

newMessageSendProxy(rpcInterface)

);

}

}

复制代码

这里的RPC客户端实际上,是动态代理了MessageSendProxy,当然这里是应用了,JDK原生的动态代理实现,你还可以改成CGLIB(CodeGenerationLibrary)方式。不过本人测试了一下CGLIB方式,在高并发的情况下面会出现空指针异常,但是同样的情况,JDK原生的动态代理却没有问题。并发程度不高的情况下面,两种代理方式都运行正常。后续再深入研究看看吧!废话不说了,现在给出MessageSendProxy的实现方式



复制代码

/

@filename:MessageSendProxy.java



NewlandCo.Ltd.Allrightsreserved.



@Description:Rpc客户端消息处理

@authortangjie

@version1.0



/

packagenewlandframework.netty.rpc.core;



importjava.lang.reflect.InvocationHandler;

importjava.lang.reflect.Method;

importjava.util.UUID;

importnewlandframework.netty.rpc.model.MessageRequest;



publicclassMessageSendProxyimplementsInvocationHandler{



privateClasscls;



publicMessageSendProxy(Classcls){

this.cls=cls;

}



publicObjectinvoke(Objectproxy,Methodmethod,Object[]args)throwsThrowable{

MessageRequestrequest=newMessageRequest();

request.setMessageId(UUID.randomUUID().toString());

request.setClassName(method.getDeclaringClass().getName());

request.setMethodName(method.getName());

request.setTypeParameters(method.getParameterTypes());

request.setParameters(args);



MessageSendHandlerhandler=RpcServerLoader.getInstance().getMessageSendHandler();

MessageCallBackcallBack=handler.sendRequest(request);

returncallBack.start();

}

}

复制代码

进一步发现MessageSendProxy其实是把消息发送给RpcServerLoader模块,它的代码如下:



复制代码

/

@filename:RpcServerLoader.java



NewlandCo.Ltd.Allrightsreserved.



@Description:rpc服务器配置加载

@authortangjie

@version1.0



/

packagenewlandframework.netty.rpc.core;



importio.netty.channel.EventLoopGroup;

importio.netty.channel.nio.NioEventLoopGroup;

importjava.net.InetSocketAddress;

importjava.util.concurrent.ThreadPoolExecutor;

importjava.util.concurrent.locks.Condition;

importjava.util.concurrent.locks.Lock;

importjava.util.concurrent.locks.ReentrantLock;

importnewlandframework.netty.rpc.serialize.support.RpcSerializeProtocol;



publicclassRpcServerLoader{



privatevolatilestaticRpcServerLoaderrpcServerLoader;

privatefinalstaticStringDELIMITER=":";

privateRpcSerializeProtocolserializeProtocol=RpcSerializeProtocol.JDKSERIALIZE;



//方法返回到Java虚拟机的可用的处理器数量

privatefinalstaticintparallel=Runtime.getRuntime().availableProcessors()2;

//nettynio线程池

privateEventLoopGroupeventLoopGroup=newNioEventLoopGroup(parallel);

privatestaticThreadPoolExecutorthreadPoolExecutor=(ThreadPoolExecutor)RpcThreadPool.getExecutor(16,-1);

privateMessageSendHandlermessageSendHandler=null;



//等待Netty服务端链路建立通知信号

privateLocklock=newReentrantLock();

privateConditionsignal=lock.newCondition();



privateRpcServerLoader(){

}



//并发双重锁定

publicstaticRpcServerLoadergetInstance(){

if(rpcServerLoader==null){

synchronized(RpcServerLoader.class){

if(rpcServerLoader==null){

rpcServerLoader=newRpcServerLoader();

}

}

}

returnrpcServerLoader;

}



publicvoidload(StringserverAddress,RpcSerializeProtocolserializeProtocol){

String[]ipAddr=serverAddress.split(RpcServerLoader.DELIMITER);

if(ipAddr.length==2){

Stringhost=ipAddr[0];

intport=Integer.parseInt(ipAddr[1]);

finalInetSocketAddressremoteAddr=newInetSocketAddress(host,port);



threadPoolExecutor.submit(newMessageSendInitializeTask(eventLoopGroup,remoteAddr,this,serializeProtocol));

}

}



publicvoidsetMessageSendHandler(MessageSendHandlermessageInHandler){

try{

lock.lock();

this.messageSendHandler=messageInHandler;

//唤醒所有等待客户端RPC线程

signal.signalAll();

}finally{

lock.unlock();

}

}



publicMessageSendHandlergetMessageSendHandler()throwsInterruptedException{

try{

lock.lock();

//Netty服务端链路没有建立完毕之前,先挂起等待

if(messageSendHandler==null){

signal.await();

}

returnmessageSendHandler;

}finally{

lock.unlock();

}

}



publicvoidunLoad(){

messageSendHandler.close();

threadPoolExecutor.shutdown();

eventLoopGroup.shutdownGracefully();

}



publicvoidsetSerializeProtocol(RpcSerializeProtocolserializeProtocol){

this.serializeProtocol=serializeProtocol;

}

}

复制代码

好了,现在一次性给出RPC客户端消息编码、解码、处理的模块实现代码。



复制代码

/

@filename:MessageSendInitializeTask.java



NewlandCo.Ltd.Allrightsreserved.



@Description:Rpc客户端线程任务处理

@authortangjie

@version1.0



/

packagenewlandframework.netty.rpc.core;



importio.netty.bootstrap.Bootstrap;

importio.netty.channel.ChannelFuture;

importio.netty.channel.ChannelFutureListener;

importio.netty.channel.ChannelOption;

importio.netty.channel.EventLoopGroup;

importio.netty.channel.socket.nio.NioSocketChannel;

importjava.net.InetSocketAddress;



publicclassMessageSendInitializeTaskimplementsRunnable{



privateEventLoopGroupeventLoopGroup=null;

privateInetSocketAddressserverAddress=null;

privateRpcServerLoaderloader=null;



MessageSendInitializeTask(EventLoopGroupeventLoopGroup,InetSocketAddressserverAddress,RpcServerLoaderloader){

this.eventLoopGroup=eventLoopGroup;

this.serverAddress=serverAddress;

this.loader=loader;

}



publicvoidrun(){

Bootstrapb=newBootstrap();

b.group(eventLoopGroup)

.channel(NioSocketChannel.class).option(ChannelOption.SO_KEEPALIVE,true);

b.handler(newMessageSendChannelInitializer());



ChannelFuturechannelFuture=b.connect(serverAddress);

channelFuture.addListener(newChannelFutureListener(){

publicvoidoperationComplete(finalChannelFuturechannelFuture)throwsException{

if(channelFuture.isSuccess()){

MessageSendHandlerhandler=channelFuture.channel().pipeline().get(MessageSendHandler.class);

MessageSendInitializeTask.this.loader.setMessageSendHandler(handler);

}

}

});

}

}

复制代码

复制代码

/

@filename:MessageSendChannelInitializer.java



NewlandCo.Ltd.Allrightsreserved.



@Description:Rpc客户端管道初始化

@authortangjie

@version1.0



/

packagenewlandframework.netty.rpc.core;



importio.netty.channel.ChannelInitializer;

importio.netty.channel.ChannelPipeline;

importio.netty.channel.socket.SocketChannel;

importio.netty.handler.codec.LengthFieldBasedFrameDecoder;

importio.netty.handler.codec.LengthFieldPrepender;

importio.netty.handler.codec.serialization.ClassResolvers;

importio.netty.handler.codec.serialization.ObjectDecoder;

importio.netty.handler.codec.serialization.ObjectEncoder;



publicclassMessageSendChannelInitializerextendsChannelInitializer{



//ObjectDecoder底层默认继承半包解码器LengthFieldBasedFrameDecoder处理粘包问题的时候,

//消息头开始即为长度字段,占据4个字节。这里出于保持兼容的考虑

finalpublicstaticintMESSAGE_LENGTH=4;



protectedvoidinitChannel(SocketChannelsocketChannel)throwsException{

ChannelPipelinepipeline=socketChannel.pipeline();

//ObjectDecoder的基类半包解码器LengthFieldBasedFrameDecoder的报文格式保持兼容。因为底层的父类LengthFieldBasedFrameDecoder

//的初始化参数即为super(maxObjectSize,0,4,0,4);

pipeline.addLast(newLengthFieldBasedFrameDecoder(Integer.MAX_VALUE,0,MessageSendChannelInitializer.MESSAGE_LENGTH,0,MessageSendChannelInitializer.MESSAGE_LENGTH));

//利用LengthFieldPrepender回填补充ObjectDecoder消息报文头

pipeline.addLast(newLengthFieldPrepender(MessageSendChannelInitializer.MESSAGE_LENGTH));

pipeline.addLast(newObjectEncoder());

//考虑到并发性能,采用weakCachingConcurrentResolver缓存策略。一般情况使用:cacheDisabled即可

pipeline.addLast(newObjectDecoder(Integer.MAX_VALUE,ClassResolvers.weakCachingConcurrentResolver(this.getClass().getClassLoader())));

pipeline.addLast(newMessageSendHandler());

}

}

复制代码

复制代码

/

@filename:MessageSendHandler.java



NewlandCo.Ltd.Allrightsreserved.



@Description:Rpc客户端处理模块

@authortangjie

@version1.0



/

packagenewlandframework.netty.rpc.core;



importio.netty.buffer.Unpooled;

importio.netty.channel.Channel;

importio.netty.channel.ChannelFutureListener;

importio.netty.channel.ChannelHandlerContext;

importio.netty.channel.ChannelInboundHandlerAdapter;

importjava.net.SocketAddress;

importjava.util.concurrent.ConcurrentHashMap;

importnewlandframework.netty.rpc.model.MessageRequest;

importnewlandframework.netty.rpc.model.MessageResponse;



publicclassMessageSendHandlerextendsChannelInboundHandlerAdapter{



privateConcurrentHashMapmapCallBack=newConcurrentHashMap();



privatevolatileChannelchannel;

privateSocketAddressremoteAddr;



publicChannelgetChannel(){

returnchannel;

}



publicSocketAddressgetRemoteAddr(){

returnremoteAddr;

}



publicvoidchannelActive(ChannelHandlerContextctx)throwsException{

super.channelActive(ctx);

this.remoteAddr=this.channel.remoteAddress();

}



publicvoidchannelRegistered(ChannelHandlerContextctx)throwsException{

super.channelRegistered(ctx);

this.channel=ctx.channel();

}



publicvoidchannelRead(ChannelHandlerContextctx,Objectmsg)throwsException{

MessageResponseresponse=(MessageResponse)msg;

StringmessageId=response.getMessageId();

MessageCallBackcallBack=mapCallBack.get(messageId);

if(callBack!=null){

mapCallBack.remove(messageId);

callBack.over(response);

}

}



publicvoidexceptionCaught(ChannelHandlerContextctx,Throwablecause)throwsException{

ctx.close();

}



publicvoidclose(){

channel.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);

}



publicMessageCallBacksendRequest(MessageRequestrequest){

MessageCallBackcallBack=newMessageCallBack(request);

mapCallBack.put(request.getMessageId(),callBack);

channel.writeAndFlush(request);

returncallBack;

}

}

复制代码

最后给出RPC服务端的实现。首先是通过spring自动加载RPC服务接口、接口实现容器绑定加载,初始化Netty主/从线程池等操作,具体是通过MessageRecvExecutor模块实现的,现在给出实现代码:



复制代码

/

@filename:MessageRecvExecutor.java



NewlandCo.Ltd.Allrightsreserved.



@Description:Rpc服务器执行模块

@authortangjie

@version1.0



/

packagenewlandframework.netty.rpc.core;



importio.netty.bootstrap.ServerBootstrap;

importio.netty.channel.ChannelFuture;

importio.netty.channel.ChannelOption;

importio.netty.channel.EventLoopGroup;

importio.netty.channel.nio.NioEventLoopGroup;

importio.netty.channel.socket.nio.NioServerSocketChannel;

importjava.nio.channels.spi.SelectorProvider;

importjava.util.Iterator;

importjava.util.Map;

importjava.util.Set;

importjava.util.concurrent.ConcurrentHashMap;

importjava.util.concurrent.ThreadFactory;

importjava.util.concurrent.ThreadPoolExecutor;

importjava.util.logging.Level;

importnewlandframework.netty.rpc.model.MessageKeyVal;

importorg.springframework.beans.BeansException;

importorg.springframework.beans.factory.InitializingBean;

importorg.springframework.context.ApplicationContext;

importorg.springframework.context.ApplicationContextAware;



publicclassMessageRecvExecutorimplementsApplicationContextAware,InitializingBean{



privateStringserverAddress;

privatefinalstaticStringDELIMITER=":";



privateMaphandlerMap=newConcurrentHashMap();



privatestaticThreadPoolExecutorthreadPoolExecutor;



publicMessageRecvExecutor(StringserverAddress){

this.serverAddress=serverAddress;

}



publicstaticvoidsubmit(Runnabletask){

if(threadPoolExecutor==null){

synchronized(MessageRecvExecutor.class){

if(threadPoolExecutor==null){

threadPoolExecutor=(ThreadPoolExecutor)RpcThreadPool.getExecutor(16,-1);

}

}

}

threadPoolExecutor.submit(task);

}



publicvoidsetApplicationContext(ApplicationContextctx)throwsBeansException{

try{

MessageKeyValkeyVal=(MessageKeyVal)ctx.getBean(Class.forName("newlandframework.netty.rpc.model.MessageKeyVal"));

MaprpcServiceObject=keyVal.getMessageKeyVal();



Sets=rpcServiceObject.entrySet();

Iterator>it=s.iterator();

Map.Entryentry;



while(it.hasNext()){

entry=it.next();

handlerMap.put(entry.getKey(),entry.getValue());

}

}catch(ClassNotFoundExceptionex){

java.util.logging.Logger.getLogger(MessageRecvExecutor.class.getName()).log(Level.SEVERE,null,ex);

}

}



publicvoidafterPropertiesSet()throwsException{

//netty的线程池模型设置成主从线程池模式,这样可以应对高并发请求

//当然netty还支持单线程、多线程网络IO模型,可以根据业务需求灵活配置

ThreadFactorythreadRpcFactory=newNamedThreadFactory("NettyRPCThreadFactory");



//方法返回到Java虚拟机的可用的处理器数量

intparallel=Runtime.getRuntime().availableProcessors()2;



EventLoopGroupboss=newNioEventLoopGroup();

EventLoopGroupworker=newNioEventLoopGroup(parallel,threadRpcFactory,SelectorProvider.provider());



try{

ServerBootstrapbootstrap=newServerBootstrap();

bootstrap.group(boss,worker).channel(NioServerSocketChannel.class)

.childHandler(newMessageRecvChannelInitializer(handlerMap))

.option(ChannelOption.www.wang027.comSO_BACKLOG,128)

.childOption(ChannelOption.SO_KEEPALIVE,true);



String[]ipAddr=serverAddress.split(MessageRecvExecutor.DELIMITER);



if(ipAddr.length==2){

Stringhost=ipAddr[0];

intport=Integer.parseInt(ipAddr[1]);

ChannelFuturefuture=bootstrap.bind(host,port).sync();

System.out.printf("[authortangjie]NettyRPCServerstartsuccessip:%sport:%d\n",host,port);

future.channel().closeFuture().sync();

}else{

System.out.printf("[authortangjie]NettyRPCServerstartfail!\n");

}

}finally{

worker.shutdownGracefully();

boss.shutdownGracefully();

}

}

}

复制代码

最后还是老规矩,给出RPC服务端消息编码、解码、处理的核心模块代码实现,具体如下:



复制代码

/

@filename:MessageRecvChannelInitializer.java



NewlandCo.Ltd.Allrightsreserved.



@Description:Rpc服务端管道初始化

@authortangjie

@version1.0



/

packagenewlandframework.netty.rpc.core;



importio.netty.channel.ChannelInitializer;

importio.netty.channel.ChannelPipeline;

importio.netty.channel.socket.SocketChannel;

importio.netty.handler.codec.LengthFieldBasedFrameDecoder;

importio.netty.handler.codec.LengthFieldPrepender;

importio.netty.handler.codec.serialization.ClassResolvers;

importio.netty.handler.codec.serialization.ObjectDecoder;

importio.netty.handler.codec.serialization.ObjectEncoder;

importjava.util.Map;



publicclassMessageRecvChannelInitializerextendsChannelInitializer{



//ObjectDecoder底层默认继承半包解码器LengthFieldBasedFrameDecoder处理粘包问题的时候,

//消息头开始即为长度字段,占据4个字节。这里出于保持兼容的考虑

finalpublicstaticintMESSAGE_LENGTH=4;

privateMaphandlerMap=null;



MessageRecvChannelInitializer(MaphandlerMap){

this.handlerMap=handlerMap;

}



protectedvoidinitChannel(SocketChannelsocketChannel)throwsException{

ChannelPipelinepipeline=socketChannel.pipeline();

//ObjectDecoder的基类半包解码器LengthFieldBasedFrameDecoder的报文格式保持兼容。因为底层的父类LengthFieldBasedFrameDecoder

//的初始化参数即为super(maxObjectSize,0,4,0,4);

pipeline.addLast(newLengthFieldBasedFrameDecoder(Integer.MAX_VALUE,0,MessageRecvChannelInitializer.MESSAGE_LENGTH,0,MessageRecvChannelInitializer.MESSAGE_LENGTH));

//利用LengthFieldPrepender回填补充ObjectDecoder消息报文头

pipeline.addLast(newLengthFieldPrepender(MessageRecvChannelInitializer.MESSAGE_LENGTH));

pipeline.addLast(newObjectEncoder());

//考虑到并发性能,采用weakCachingConcurrentResolver缓存策略。一般情况使用:cacheDisabled即可

pipeline.addLast(newObjectDecoder(Integer.MAX_VALUE,ClassResolvers.weakCachingConcurrentResolver(this.getClass().getClassLoader())));

pipeline.addLast(newMessageRecvHandler(handlerMap));

}

}

复制代码

复制代码

/

@filename:MessageRecvHandler.java



NewlandCo.Ltd.Allrightsreserved.



@Description:Rpc服务器消息处理

@authortangjie

@version1.0



/

packagenewlandframework.netty.rpc.core;



importio.netty.channel.ChannelHandlerContext;

importio.netty.channel.ChannelInboundHandlerAdapter;

importjava.util.Map;

importnewlandframework.netty.rpc.model.MessageRequest;

importnewlandframework.netty.rpc.model.MessageResponse;



publicclassMessageRecvHandlerextendsChannelInboundHandlerAdapter{



privatefinalMaphandlerMap;



publicMessageRecvHandler(MaphandlerMap){

this.handlerMap=handlerMap;

}



publicvoidchannelRead(ChannelHandlerContextctx,Objectmsg)throwsException{

MessageRequestrequest=(MessageRequest)msg;

MessageResponseresponse=newMessageResponse();

MessageRecvInitializeTaskrecvTask=newMessageRecvInitializeTask(request,response,handlerMap,ctx);

//不要阻塞nio线程,复杂的业务逻辑丢给专门的线程池

MessageRecvExecutor.submit(recvTask);

}



publicvoidexceptionCaught(ChannelHandlerContextctx,Throwablecause){

//网络有异常要关闭通道

ctx.close();

}

}

复制代码

复制代码

/

@filename:MessageRecvInitializeTask.java



NewlandCo.Ltd.Allrightsreserved.



@Description:Rpc服务器消息线程任务处理

@authortangjie

@version1.0



/

packagenewlandframework.netty.rpc.core;



importio.netty.channel.ChannelFuture;

importio.netty.channel.ChannelFutureListener;

importio.netty.channel.ChannelHandlerContext;

importjava.util.Map;

importnewlandframework.netty.rpc.model.MessageRequest;

importnewlandframework.netty.rpc.model.MessageResponse;

importorg.apache.commons.beanutils.MethodUtils;



publicclassMessageRecvInitializeTaskimplementsRunnable{



privateMessageRequestrequest=null;

privateMessageResponseresponse=null;

privateMaphandlerMap=null;

privateChannelHandlerContextctx=null;



publicMessageResponsegetResponse(){

returnresponse;

}



publicMessageRequestgetRequest(){

returnrequest;

}



publicvoidsetRequest(MessageRequestrequest){

this.request=request;

}



MessageRecvInitializeTask(MessageRequestrequest,MessageResponseresponse,MaphandlerMap,ChannelHandlerContextctx){

this.request=request;

this.response=response;

this.handlerMap=handlerMap;

this.ctx=ctx;

}



publicvoidrun(){

response.setMessageId(request.getMessageId());

try{

Objectresult=reflect(request);

response.setResult(result);

}catch(Throwablet){

response.setError(t.toString());

t.printStackTrace();

System.err.printf("RPCServerinvokeerror!\n");

}



ctx.writeAndFlush(response).addListener(newChannelFutureListener(){

publicvoidoperationComplete(ChannelFuturechannelFuture)throwsException{

System.out.println("RPCServerSendmessage-idrespone:"+request.getMessageId());

}

});

}



privateObjectreflect(MessageRequestrequest)throwsThrowable{

StringclassName=request.getClassName();

ObjectserviceBean=handlerMap.get(className);

StringmethodName=request.getMethodName();

Object[]parameters=request.getParameters();

returnMethodUtils.invokeMethod(serviceBean,methodName,parameters);

}

}

复制代码

然后是RPC消息处理的回调实现模块代码



复制代码

/

@filename:MessageCallBack.java



NewlandCo.Ltd.Allrightsreserved.



@Description:Rpc消息回调

@authortangjie

@version1.0



/

packagenewlandframework.netty.rpc.core;



importjava.util.concurrent.TimeUnit;

importjava.util.concurrent.locks.Condition;

importjava.util.concurrent.locks.Lock;

importjava.util.concurrent.locks.ReentrantLock;

importnewlandframework.netty.rpc.model.MessageRequest;

importnewlandframework.netty.rpc.model.MessageResponse;



publicclassMessageCallBack{



privateMessageRequestrequest;

privateMessageResponseresponse;

privateLocklock=newReentrantLock();

privateConditionfinish=lock.newCondition();



publicMessageCallBack(MessageRequestrequest){

this.request=request;

}



publicObjectstart()throwsInterruptedException{

try{

lock.lock();

//设定一下超时时间,rpc服务器太久没有相应的话,就默认返回空吧。

finish.await(101000,TimeUnit.MILLISECONDS);

if(this.response!=null){

returnthis.response.getResult();

}else{

returnnull;

}

}finally{

lock.unlock();

}

}



publicvoidover(MessageResponsereponse){

try{

lock.lock();

finish.signal();

this.response=reponse;

}finally{

lock.unlock();

}

}

}

复制代码

到此为止,NettyRPC的关键部分:服务端、客户端的模块已经通过Netty全部实现了。现在给出spring加载配置rpc-invoke-config.xml的内容:



复制代码




xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

xmlns:context="http://www.springframework.org/schema/context"

xsi:schemaLocation="http://www.springframework.org/schema/beans

http://www.springframework.org/schema/beans/spring-beans.xsd

http://www.springframework.org/schema/context

http://www.springframework.org/schema/context/spring-context.xsd">

































复制代码

再贴出RPC服务绑定ip信息的配置文件:rpc-server.properties的内容。



#rpcserver''sipaddressconfig

rpc.server.addr=127.0.0.1:18888

最后NettyRPC服务端启动方式参考如下:



newClassPathXmlApplicationContext("newlandframework/netty/rpc/config/rpc-invoke-config.xml");

如果一切顺利,没有出现意外的话,控制台上面,会出现如下截图所示的情况:







如果出现了,说明NettyRPC服务器,已经启动成功!



上面基于Netty的RPC服务器,并发处理性能如何呢?实践是检验真理的唯一标准,下面我们就来实战一下。



下面的测试案例,是基于RPC远程调用两数相加函数,并返回计算结果。客户端同时开1W个线程,同一时刻,瞬时发起并发计算请求,然后观察Netty的RPC服务器是否有正常应答回复响应,以及客户端是否有正常返回调用计算结果。值得注意的是,测试案例是基于1W个线程瞬时并发请求而设计的,并不是1W个线程循环发起请求。这两者对于衡量RPC服务器的并发处理性能,还是有很大差别的。当然,前者对于并发性能的处理要求,要高上很多很多。



现在,先给出RPC计算接口、RPC计算接口实现类的代码实现:



复制代码

/

@filename:Calculate.java



NewlandCo.Ltd.Allrightsreserved.



@Description:计算器定义接口

@authortangjie

@version1.0



/

packagenewlandframework.netty.rpc.servicebean;



publicinterfaceCalculate{

//两数相加

intadd(inta,intb);

}

复制代码

复制代码

/

@filename:CalculateImpl.java



NewlandCo.Ltd.Allrightsreserved.



@Description:计算器定义接口实现

@authortangjie

@version1.0



/

packagenewlandframework.netty.rpc.servicebean;



publicclassCalculateImplimplementsCalculate{

//两数相加

publicintadd(inta,intb){

returna+b;

}

}

复制代码

下面是瞬时并发RPC请求的测试样例:



复制代码

/

@filename:CalcParallelRequestThread.java



NewlandCo.Ltd.Allrightsreserved.



@Description:并发线程模拟

@authortangjie

@version1.0



/

packagenewlandframework.netty.rpc.servicebean;



importnewlandframework.netty.rpc.core.MessageSendExecutor;

importjava.util.concurrent.CountDownLatch;

importjava.util.logging.Level;

importjava.util.logging.Logger;



publicclassCalcParallelRequestThreadimplementsRunnable{



privateCountDownLatchsignal;

privateCountDownLatchfinish;

privateMessageSendExecutorexecutor;

privateinttaskNumber=0;



publicCalcParallelRequestThread(MessageSendExecutorexecutor,CountDownLatchsignal,CountDownLatchfinish,inttaskNumber){

this.signal=signal;

this.finish=finish;

this.taskNumber=taskNumber;

this.executor=executor;

}



publicvoidrun(){

try{

signal.await();



Calculatecalc=executor.execute(Calculate.class);

intadd=calc.add(taskNumber,taskNumber);

System.out.println("calcaddresult:["+add+"]");



finish.countDown();

}catch(InterruptedExceptionex){

Logger.getLogger(CalcParallelRequestThread.class.getName()).log(Level.SEVERE,null,ex);

}

}

}

复制代码

复制代码

/

@filename:RpcParallelTest.java



NewlandCo.Ltd.Allrightsreserved.



@Description:rpc并发测试代码

@authortangjie

@version1.0



/

packagenewlandframework.netty.rpc.servicebean;



importjava.util.concurrent.CountDownLatch;

importnewlandframework.netty.rpc.core.MessageSendExecutor;

importorg.apache.commons.lang.time.StopWatch;



publicclassRpcParallelTest{



publicstaticvoidmain(String[]args)throwsException{

finalMessageSendExecutorexecutor=newMessageSendExecutor("127.0.0.1:18888");

//并行度10000

intparallel=10000;



//开始计时

StopWatchsw=newStopWatch();

sw.start();



CountDownLatchsignal=newCountDownLatch(1);

CountDownLatchfinish=newCountDownLatch(parallel);



for(intindex=0;index
CalcParallelRequestThreadclient=newCalcParallelRequestThread(executor,signal,finish,index);

newThread(client).start();

}



//10000个并发线程瞬间发起请求操作

signal.countDown();

finish.await();



sw.stop();



Stringtip=String.format("RPC调用总共耗时:[%s]毫秒",sw.getTime());

System.out.println(tip);



executor.stop();

}



献花(0)
+1
(本文系thedust79首藏)