如何使用Netty开发实现高性能的RPC服务器
RPC(RemoteProcedureCallProtocol)远程过程调用协议,它是一种通过网络,从远程计算机程序上请求服务,而不必了解底层网络技术的协议。说的再直白一点,就是客户端在不必知道调用细节的前提之下,调用远程计算机上运行的某个对象,使用起来就像调用本地的对象一样。目前典型的RPC实现框架有:Thrift(facebook开源)、Dubbo(alibaba开源)等等。RPC框架针对网络协议、网络I/O模型的封装是透明的,对于调用的客户端而言,它就认为自己在调用本地的一个对象。至于传输层上,运用的是TCP协议、UDP协议、亦或是HTTP协议,一概不关心。从网络I/O模型上来看,是基于select、poll、epoll方式、还是IOCP(I/OCompletionPort)方式承载实现的,对于调用者而言也不用关心。
目前,主流的RPC框架都支持跨语言调用,即有所谓的IDL(接口定义语言),其实,这个并不是RPC所必须要求的。如果你的RPC框架没有跨语言的要求,IDL就可以不用包括了。
最后,值得一提的是,衡量一个RPC框架性能的好坏与否,RPC的网络I/O模型的选择,至关重要。在此基础上,设计出来的RPC服务器,可以考虑支持阻塞式同步IO、非阻塞式同步IO、当然还有所谓的多路复用IO模型、异步IO模型。支持不同的网络IO模型,在高并发的状态下,处理性能上会有很大的差别。还有一个衡量的标准,就是选择的传输协议。是基于TCP协议、还是HTTP协议、还是UDP协议?对性能也有一定的影响。但是从我目前了解的情况来看,大多数RPC开源实现框架都是基于TCP、或者HTTP的,目测没有采用UDP协议做为主要的传输协议的。
明白了RPC的使用原理和性能要求。现在,我们能不能撇开那些RPC开源框架,自己动手开发一个高性能的RPC服务器呢?我想,还是可以的。现在本人就使用Java,基于Netty,开发实现一个高性能的RPC服务器。
如何实现、基于什么原理?并发处理性能如何?请继续接着看下文。
我们有的时候,为了提高单个节点的通信吞吐量,提高通信性能。如果是基于Java后端的,一般首选的是NIO框架(No-blockIO)。但是问题也来了,Java的NIO掌握起来要相当的技术功底,和足够的技术积累,使用起来才能得心应手。一般的开发人员,如果要使用NIO开发一个后端的TCP/HTTP服务器,附带考虑TCP粘包、网络通信异常、消息链接处理等等网络通信细节,开发门槛太高,所以比较明智的选择是,采用业界主流的NIO框架进行服务器后端开发。主流的NIO框架主要有Netty、Mina。它们主要都是基于TCP通信,非阻塞的IO、灵活的IO线程池而设计的,应对高并发请求也是绰绰有余。随着Netty、Mina这样优秀的NIO框架,设计上日趋完善,Java后端高性能服务器开发,在技术上提供了有力的支持保障,从而打破了C++在服务器后端,一统天下的局面。因为在此之前,Java的NIO一直受人诟病,让人敬而远之!
既然,这个RPC服务器是基于Netty的,那就在说说Netty吧。实际上Netty是对JAVANIO框架的再次封装,它的开源网址是http://netty.io/,本文中使用的Netty版本是:4.0版本,可以通过http://dl.bintray.com/netty/downloads/netty-4.0.37.Final.tar.bz2,进行下载使用。那也许你会问,如何使用Netty进行RPC服务器的开发呢?实际不难,下面我就简单的说明一下技术原理:
1、定义RPC请求消息、应答消息结构,里面要包括RPC的接口定义模块、包括远程调用的类名、方法名称、参数结构、参数值等信息。
2、服务端初始化的时候通过容器加载RPC接口定义和RPC接口实现类对象的映射关系,然后等待客户端发起调用请求。
3、客户端发起的RPC消息里面包含,远程调用的类名、方法名称、参数结构、参数值等信息,通过网络,以字节流的方式送给RPC服务端,RPC服务端接收到字节流的请求之后,去对应的容器里面,查找客户端接口映射的具体实现对象。
4、RPC服务端找到实现对象的参数信息,通过反射机制创建该对象的实例,并返回调用处理结果,最后封装成RPC应答消息通知到客户端。
5、客户端通过网络,收到字节流形式的RPC应答消息,进行拆包、解析之后,显示远程调用结果。
上面说的是很简单,但是实现的时候,我们还要考虑如下的问题:
1、RPC服务器的传输层是基于TCP协议的,出现粘包咋办?这样客户端的请求,服务端不是会解析失败?好在Netty里面已经提供了解决TCP粘包问题的解码器:LengthFieldBasedFrameDecoder,可以靠它轻松搞定TCP粘包问题。
2、Netty服务端的线程模型是单线程、多线程(一个线程负责客户端连接,连接成功之后,丢给后端IO的线程池处理)、还是主从模式(客户端连接、后端IO处理都是基于线程池的实现)。当然在这里,我出于性能考虑,使用了Netty主从线程池模型。
3、Netty的IO处理线程池,如果遇到非常耗时的业务,出现阻塞了咋办?这样不是很容易把后端的NIO线程给挂死、阻塞?本文的处理方式是,对于复杂的后端业务,分派到专门的业务线程池里面,进行异步回调处理。
4、RPC消息的传输是通过字节流在NIO的通道(Channel)之间传输,那具体如何实现呢?本文,是通过基于Java原生对象序列化机制的编码、解码器(ObjectEncoder、ObjectDecoder)进行实现的。当然出于性能考虑,这个可能不是最优的方案。更优的方案是把消息的编码、解码器,搞成可以配置实现的。具体比如可以通过:protobuf、JBossMarshalling方式进行解码和编码,以提高网络消息的传输效率。
5、RPC服务器要考虑多线程、高并发的使用场景,所以线程安全是必须的。此外尽量不要使用synchronized进行加锁,改用轻量级的ReentrantLock方式进行代码块的条件加锁。比如本文中的RPC消息处理回调,就有这方面的使用。
6、RPC服务端的服务接口对象和服务接口实现对象要能轻易的配置,轻松进行加载、卸载。在这里,本文是通过Spring容器进行统一的对象管理。
客户端并发发起RPC调用请求,然后RPC服务端使用Netty连接器,分派出N个NIO连接线程,这个时候Netty连接器的任务结束。然后NIO连接线程是统一放到NettyNIO处理线程池进行管理,这个线程池里面会对具体的RPC请求连接进行消息编码、消息解码、消息处理等等一系列操作。最后进行消息处理(Handler)的时候,处于性能考虑,这里的设计是,直接把复杂的消息处理过程,丢给专门的RPC业务处理线程池集中处理,然后Handler对应的NIO线程就立即返回、不会阻塞。这个时候RPC调用结束,客户端会异步等待服务端消息的处理结果,本文是通过消息回调机制实现(MessageCallBack)。
其中newlandframework.netty.rpc.core包是NettyRPC的核心实现。newlandframework.netty.rpc.model包里面,则封装了RPC消息请求、应答报文结构,以及RPC服务接口与实现绑定关系的容器定义。newlandframework.netty.rpc.config里面定义了NettyRPC的服务端文件配置属性。
下面先来看下newlandframework.netty.rpc.model包中定义的内容。
复制代码
/
@filename:MessageRequest.java
NewlandCo.Ltd.Allrightsreserved.
@Description:rpc服务请求结构
@authortangjie
@version1.0
/
packagenewlandframework.netty.rpc.model;
importjava.io.Serializable;
importorg.apache.commons.lang.builder.ToStringBuilder;
importorg.apache.commons.lang.builder.ToStringStyle;
publicclassMessageRequestimplementsSerializable{
privateStringmessageId;
privateStringclassName;
privateStringmethodName;
privateClass>[]typeParameters;
privateObject[]parametersVal;
publicStringgetMessageId(){
returnmessageId;
}
publicvoidsetMessageId(StringmessageId){
this.messageId=messageId;
}
publicStringgetClassName(){
returnclassName;
}
publicvoidsetClassName(StringclassName){
this.className=className;
}
publicStringgetMethodName(){
returnmethodName;
}
publicvoidsetMethodName(StringmethodName){
this.methodName=methodName;
}
publicClass>[]getTypeParameters(){
returntypeParameters;
}
publicvoidsetTypeParameters(Class>[]typeParameters){
this.typeParameters=typeParameters;
}
publicObject[]getParameters(){
returnparametersVal;
}
publicvoidsetParameters(Object[]parametersVal){
this.parametersVal=parametersVal;
}
publicStringtoString(){
returnnewToStringBuilder(this,ToStringStyle.SHORT_PREFIX_STYLE)
.append("messageId",messageId).append("className",className)
.append("methodName",methodName).toString();
}
}
复制代码
RPC应答消息结构
复制代码
/
@filename:MessageResponse.java
NewlandCo.Ltd.Allrightsreserved.
@Description:rpc服务应答结构
@authortangjie
@version1.0
/
packagenewlandframework.netty.rpc.model;
importjava.io.Serializable;
importorg.apache.commons.lang.builder.ToStringBuilder;
importorg.apache.commons.lang.builder.ToStringStyle;
publicclassMessageResponseimplementsSerializable{
privateStringmessageId;
privateStringerror;
privateObjectresultDesc;
publicStringgetMessageId(){
returnmessageId;
}
publicvoidsetMessageId(StringmessageId){
this.messageId=messageId;
}
publicStringgetError(){
returnerror;
}
publicvoidsetError(Stringerror){
this.error=error;
}
publicObjectgetResult(){
returnresultDesc;
}
publicvoidsetResult(ObjectresultDesc){
this.resultDesc=resultDesc;
}
publicStringtoString(){
returnnewToStringBuilder(this,ToStringStyle.SHORT_PREFIX_STYLE)
.append("messageId",messageId).append("error",error).toString();
}
}
复制代码
RPC服务接口定义、服务接口实现绑定关系容器定义,提供给spring作为容器使用。
复制代码
/
@filename:MessageKeyVal.java
NewlandCo.Ltd.Allrightsreserved.
@Description:rpc服务映射容器
@authortangjie
@version1.0
/
packagenewlandframework.netty.rpc.model;
importjava.util.Map;
publicclassMessageKeyVal{
privateMapmessageKeyVal;
publicvoidsetMessageKeyVal(MapmessageKeyVal){
this.messageKeyVal=messageKeyVal;
}
publicMapgetMessageKeyVal(){
returnmessageKeyVal;
}
}
复制代码
好了,定义好核心模型结构之后,现在再向大家展示一下NettyRPC核心包:newlandframework.netty.rpc.core的关键部分实现代码,首先是业务线程池相关类的实现代码,具体如下:
线程工厂定义实现
复制代码
/
@filename:NamedThreadFactory.java
NewlandCo.Ltd.Allrightsreserved.
@Description:线程工厂
@authortangjie
@version1.0
/
packagenewlandframework.netty.rpc.core;
importjava.util.concurrent.ThreadFactory;
importjava.util.concurrent.atomic.AtomicInteger;
publicclassNamedThreadFactoryimplementsThreadFactory{
privatestaticfinalAtomicIntegerthreadNumber=newAtomicInteger(1);
privatefinalAtomicIntegermThreadNum=newAtomicInteger(1);
privatefinalStringprefix;
privatefinalbooleandaemoThread;
privatefinalThreadGroupthreadGroup;
publicNamedThreadFactory(){
this("rpcserver-threadpool-"+threadNumber.getAndIncrement(),false);
}
publicNamedThreadFactory(Stringprefix){
this(prefix,false);
}
publicNamedThreadFactory(Stringprefix,booleandaemo){
this.prefix=prefix+"-thread-";
daemoThread=daemo;
SecurityManagers=System.getSecurityManager();
threadGroup=(s==null)?Thread.currentThread().getThreadGroup():s.getThreadGroup();
}
publicThreadnewThread(Runnablerunnable){
Stringname=prefix+mThreadNum.getAndIncrement();
Threadret=newThread(threadGroup,runnable,name,0);
ret.setDaemon(daemoThread);
returnret;
}
publicThreadGroupgetThreadGroup(){
returnthreadGroup;
}
}
复制代码
业务线程池定义实现
复制代码
/
@filename:RpcThreadPool.java
NewlandCo.Ltd.Allrightsreserved.
@Description:rpc线程池封装
@authortangjie
@version1.0
/
packagenewlandframework.netty.rpc.core;
importjava.util.concurrent.Executor;
importjava.util.concurrent.LinkedBlockingQueue;
importjava.util.concurrent.SynchronousQueue;
importjava.util.concurrent.ThreadPoolExecutor;
importjava.util.concurrent.TimeUnit;
publicclassRpcThreadPool{
//独立出线程池主要是为了应对复杂耗I/O操作的业务,不阻塞netty的handler线程而引入
//当然如果业务足够简单,把处理逻辑写入netty的handler(ChannelInboundHandlerAdapter)也未尝不可
publicstaticExecutorgetExecutor(intthreads,intqueues){
Stringname="RpcThreadPool";
returnnewThreadPoolExecutor(threads,threads,0,TimeUnit.MILLISECONDS,
queues==0?newSynchronousQueue()
:(queues<0?newLinkedBlockingQueue()
:newLinkedBlockingQueue(queues)),
newNamedThreadFactory(name,true),newAbortPolicyWithReport(name));
}
}
复制代码
复制代码
/
@filename:AbortPolicyWithReport.java
NewlandCo.Ltd.Allrightsreserved.
@Description:线程池异常策略
@authortangjie
@version1.0
/
packagenewlandframework.netty.rpc.core;
importjava.util.concurrent.RejectedExecutionException;
importjava.util.concurrent.ThreadPoolExecutor;
publicclassAbortPolicyWithReportextendsThreadPoolExecutor.AbortPolicy{
privatefinalStringthreadName;
publicAbortPolicyWithReport(StringthreadName){
this.threadName=threadName;
}
publicvoidrejectedExecution(Runnabler,ThreadPoolExecutore){
Stringmsg=String.format("RpcServer["
+"ThreadName:%s,PoolSize:%d(active:%d,core:%d,max:%d,largest:%d),Task:%d(completed:%d),"
+"Executorstatus:(isShutdown:%s,isTerminated:%s,isTerminating:%s)]",
threadName,e.getPoolSize(),e.getActiveCount(),e.getCorePoolSize(),e.getMaximumPoolSize(),e.getLargestPoolSize(),
e.getTaskCount(),e.getCompletedTaskCount(),e.isShutdown(),e.isTerminated(),e.isTerminating());
System.out.println(msg);
thrownewRejectedExecutionException(msg);
}
}
复制代码
RPC调用客户端定义实现
复制代码
/
@filename:MessageSendExecutor.java
NewlandCo.Ltd.Allrightsreserved.
@Description:Rpc客户端执行模块
@authortangjie
@version1.0
/
packagenewlandframework.netty.rpc.core;
importjava.lang.reflect.Proxy;
publicclassMessageSendExecutor{
privateRpcServerLoaderloader=RpcServerLoader.getInstance();
publicMessageSendExecutor(StringserverAddress){
loader.load(serverAddress);
}
publicvoidstop(){
loader.unLoad();
}
publicstaticTexecute(ClassrpcInterface){
return(T)Proxy.newProxyInstance(
rpcInterface.getClassLoader(),
newClass>[]{rpcInterface},
newMessageSendProxy(rpcInterface)
);
}
}
复制代码
这里的RPC客户端实际上,是动态代理了MessageSendProxy,当然这里是应用了,JDK原生的动态代理实现,你还可以改成CGLIB(CodeGenerationLibrary)方式。不过本人测试了一下CGLIB方式,在高并发的情况下面会出现空指针异常,但是同样的情况,JDK原生的动态代理却没有问题。并发程度不高的情况下面,两种代理方式都运行正常。后续再深入研究看看吧!废话不说了,现在给出MessageSendProxy的实现方式
复制代码
/
@filename:MessageSendProxy.java
NewlandCo.Ltd.Allrightsreserved.
@Description:Rpc客户端消息处理
@authortangjie
@version1.0
/
packagenewlandframework.netty.rpc.core;
importjava.lang.reflect.InvocationHandler;
importjava.lang.reflect.Method;
importjava.util.UUID;
importnewlandframework.netty.rpc.model.MessageRequest;
publicclassMessageSendProxyimplementsInvocationHandler{
privateClasscls;
publicMessageSendProxy(Classcls){
this.cls=cls;
}
publicObjectinvoke(Objectproxy,Methodmethod,Object[]args)throwsThrowable{
MessageRequestrequest=newMessageRequest();
request.setMessageId(UUID.randomUUID().toString());
request.setClassName(method.getDeclaringClass().getName());
request.setMethodName(method.getName());
request.setTypeParameters(method.getParameterTypes());
request.setParameters(args);
MessageSendHandlerhandler=RpcServerLoader.getInstance().getMessageSendHandler();
MessageCallBackcallBack=handler.sendRequest(request);
returncallBack.start();
}
}
复制代码
进一步发现MessageSendProxy其实是把消息发送给RpcServerLoader模块,它的代码如下:
复制代码
/
@filename:RpcServerLoader.java
NewlandCo.Ltd.Allrightsreserved.
@Description:rpc服务器配置加载
@authortangjie
@version1.0
/
packagenewlandframework.netty.rpc.core;
importio.netty.channel.EventLoopGroup;
importio.netty.channel.nio.NioEventLoopGroup;
importjava.net.InetSocketAddress;
importjava.util.concurrent.ThreadPoolExecutor;
importjava.util.concurrent.locks.Condition;
importjava.util.concurrent.locks.Lock;
importjava.util.concurrent.locks.ReentrantLock;
importnewlandframework.netty.rpc.serialize.support.RpcSerializeProtocol;
publicclassRpcServerLoader{
privatevolatilestaticRpcServerLoaderrpcServerLoader;
privatefinalstaticStringDELIMITER=":";
privateRpcSerializeProtocolserializeProtocol=RpcSerializeProtocol.JDKSERIALIZE;
//方法返回到Java虚拟机的可用的处理器数量
privatefinalstaticintparallel=Runtime.getRuntime().availableProcessors()2;
//nettynio线程池
privateEventLoopGroupeventLoopGroup=newNioEventLoopGroup(parallel);
privatestaticThreadPoolExecutorthreadPoolExecutor=(ThreadPoolExecutor)RpcThreadPool.getExecutor(16,-1);
privateMessageSendHandlermessageSendHandler=null;
//等待Netty服务端链路建立通知信号
privateLocklock=newReentrantLock();
privateConditionsignal=lock.newCondition();
privateRpcServerLoader(){
}
//并发双重锁定
publicstaticRpcServerLoadergetInstance(){
if(rpcServerLoader==null){
synchronized(RpcServerLoader.class){
if(rpcServerLoader==null){
rpcServerLoader=newRpcServerLoader();
}
}
}
returnrpcServerLoader;
}
publicvoidload(StringserverAddress,RpcSerializeProtocolserializeProtocol){
String[]ipAddr=serverAddress.split(RpcServerLoader.DELIMITER);
if(ipAddr.length==2){
Stringhost=ipAddr[0];
intport=Integer.parseInt(ipAddr[1]);
finalInetSocketAddressremoteAddr=newInetSocketAddress(host,port);
threadPoolExecutor.submit(newMessageSendInitializeTask(eventLoopGroup,remoteAddr,this,serializeProtocol));
}
}
publicvoidsetMessageSendHandler(MessageSendHandlermessageInHandler){
try{
lock.lock();
this.messageSendHandler=messageInHandler;
//唤醒所有等待客户端RPC线程
signal.signalAll();
}finally{
lock.unlock();
}
}
publicMessageSendHandlergetMessageSendHandler()throwsInterruptedException{
try{
lock.lock();
//Netty服务端链路没有建立完毕之前,先挂起等待
if(messageSendHandler==null){
signal.await();
}
returnmessageSendHandler;
}finally{
lock.unlock();
}
}
publicvoidunLoad(){
messageSendHandler.close();
threadPoolExecutor.shutdown();
eventLoopGroup.shutdownGracefully();
}
publicvoidsetSerializeProtocol(RpcSerializeProtocolserializeProtocol){
this.serializeProtocol=serializeProtocol;
}
}
复制代码
好了,现在一次性给出RPC客户端消息编码、解码、处理的模块实现代码。
复制代码
/
@filename:MessageSendInitializeTask.java
NewlandCo.Ltd.Allrightsreserved.
@Description:Rpc客户端线程任务处理
@authortangjie
@version1.0
/
packagenewlandframework.netty.rpc.core;
importio.netty.bootstrap.Bootstrap;
importio.netty.channel.ChannelFuture;
importio.netty.channel.ChannelFutureListener;
importio.netty.channel.ChannelOption;
importio.netty.channel.EventLoopGroup;
importio.netty.channel.socket.nio.NioSocketChannel;
importjava.net.InetSocketAddress;
publicclassMessageSendInitializeTaskimplementsRunnable{
privateEventLoopGroupeventLoopGroup=null;
privateInetSocketAddressserverAddress=null;
privateRpcServerLoaderloader=null;
MessageSendInitializeTask(EventLoopGroupeventLoopGroup,InetSocketAddressserverAddress,RpcServerLoaderloader){
this.eventLoopGroup=eventLoopGroup;
this.serverAddress=serverAddress;
this.loader=loader;
}
publicvoidrun(){
Bootstrapb=newBootstrap();
b.group(eventLoopGroup)
.channel(NioSocketChannel.class).option(ChannelOption.SO_KEEPALIVE,true);
b.handler(newMessageSendChannelInitializer());
ChannelFuturechannelFuture=b.connect(serverAddress);
channelFuture.addListener(newChannelFutureListener(){
publicvoidoperationComplete(finalChannelFuturechannelFuture)throwsException{
if(channelFuture.isSuccess()){
MessageSendHandlerhandler=channelFuture.channel().pipeline().get(MessageSendHandler.class);
MessageSendInitializeTask.this.loader.setMessageSendHandler(handler);
}
}
});
}
}
复制代码
复制代码
/
@filename:MessageSendChannelInitializer.java
NewlandCo.Ltd.Allrightsreserved.
@Description:Rpc客户端管道初始化
@authortangjie
@version1.0
/
packagenewlandframework.netty.rpc.core;
importio.netty.channel.ChannelInitializer;
importio.netty.channel.ChannelPipeline;
importio.netty.channel.socket.SocketChannel;
importio.netty.handler.codec.LengthFieldBasedFrameDecoder;
importio.netty.handler.codec.LengthFieldPrepender;
importio.netty.handler.codec.serialization.ClassResolvers;
importio.netty.handler.codec.serialization.ObjectDecoder;
importio.netty.handler.codec.serialization.ObjectEncoder;
publicclassMessageSendChannelInitializerextendsChannelInitializer{
//ObjectDecoder底层默认继承半包解码器LengthFieldBasedFrameDecoder处理粘包问题的时候,
//消息头开始即为长度字段,占据4个字节。这里出于保持兼容的考虑
finalpublicstaticintMESSAGE_LENGTH=4;
protectedvoidinitChannel(SocketChannelsocketChannel)throwsException{
ChannelPipelinepipeline=socketChannel.pipeline();
//ObjectDecoder的基类半包解码器LengthFieldBasedFrameDecoder的报文格式保持兼容。因为底层的父类LengthFieldBasedFrameDecoder
//的初始化参数即为super(maxObjectSize,0,4,0,4);
pipeline.addLast(newLengthFieldBasedFrameDecoder(Integer.MAX_VALUE,0,MessageSendChannelInitializer.MESSAGE_LENGTH,0,MessageSendChannelInitializer.MESSAGE_LENGTH));
//利用LengthFieldPrepender回填补充ObjectDecoder消息报文头
pipeline.addLast(newLengthFieldPrepender(MessageSendChannelInitializer.MESSAGE_LENGTH));
pipeline.addLast(newObjectEncoder());
//考虑到并发性能,采用weakCachingConcurrentResolver缓存策略。一般情况使用:cacheDisabled即可
pipeline.addLast(newObjectDecoder(Integer.MAX_VALUE,ClassResolvers.weakCachingConcurrentResolver(this.getClass().getClassLoader())));
pipeline.addLast(newMessageSendHandler());
}
}
复制代码
复制代码
/
@filename:MessageSendHandler.java
NewlandCo.Ltd.Allrightsreserved.
@Description:Rpc客户端处理模块
@authortangjie
@version1.0
/
packagenewlandframework.netty.rpc.core;
importio.netty.buffer.Unpooled;
importio.netty.channel.Channel;
importio.netty.channel.ChannelFutureListener;
importio.netty.channel.ChannelHandlerContext;
importio.netty.channel.ChannelInboundHandlerAdapter;
importjava.net.SocketAddress;
importjava.util.concurrent.ConcurrentHashMap;
importnewlandframework.netty.rpc.model.MessageRequest;
importnewlandframework.netty.rpc.model.MessageResponse;
publicclassMessageSendHandlerextendsChannelInboundHandlerAdapter{
privateConcurrentHashMapmapCallBack=newConcurrentHashMap();
privatevolatileChannelchannel;
privateSocketAddressremoteAddr;
publicChannelgetChannel(){
returnchannel;
}
publicSocketAddressgetRemoteAddr(){
returnremoteAddr;
}
publicvoidchannelActive(ChannelHandlerContextctx)throwsException{
super.channelActive(ctx);
this.remoteAddr=this.channel.remoteAddress();
}
publicvoidchannelRegistered(ChannelHandlerContextctx)throwsException{
super.channelRegistered(ctx);
this.channel=ctx.channel();
}
publicvoidchannelRead(ChannelHandlerContextctx,Objectmsg)throwsException{
MessageResponseresponse=(MessageResponse)msg;
StringmessageId=response.getMessageId();
MessageCallBackcallBack=mapCallBack.get(messageId);
if(callBack!=null){
mapCallBack.remove(messageId);
callBack.over(response);
}
}
publicvoidexceptionCaught(ChannelHandlerContextctx,Throwablecause)throwsException{
ctx.close();
}
publicvoidclose(){
channel.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
}
publicMessageCallBacksendRequest(MessageRequestrequest){
MessageCallBackcallBack=newMessageCallBack(request);
mapCallBack.put(request.getMessageId(),callBack);
channel.writeAndFlush(request);
returncallBack;
}
}
复制代码
最后给出RPC服务端的实现。首先是通过spring自动加载RPC服务接口、接口实现容器绑定加载,初始化Netty主/从线程池等操作,具体是通过MessageRecvExecutor模块实现的,现在给出实现代码:
复制代码
/
@filename:MessageRecvExecutor.java
NewlandCo.Ltd.Allrightsreserved.
@Description:Rpc服务器执行模块
@authortangjie
@version1.0
/
packagenewlandframework.netty.rpc.core;
importio.netty.bootstrap.ServerBootstrap;
importio.netty.channel.ChannelFuture;
importio.netty.channel.ChannelOption;
importio.netty.channel.EventLoopGroup;
importio.netty.channel.nio.NioEventLoopGroup;
importio.netty.channel.socket.nio.NioServerSocketChannel;
importjava.nio.channels.spi.SelectorProvider;
importjava.util.Iterator;
importjava.util.Map;
importjava.util.Set;
importjava.util.concurrent.ConcurrentHashMap;
importjava.util.concurrent.ThreadFactory;
importjava.util.concurrent.ThreadPoolExecutor;
importjava.util.logging.Level;
importnewlandframework.netty.rpc.model.MessageKeyVal;
importorg.springframework.beans.BeansException;
importorg.springframework.beans.factory.InitializingBean;
importorg.springframework.context.ApplicationContext;
importorg.springframework.context.ApplicationContextAware;
publicclassMessageRecvExecutorimplementsApplicationContextAware,InitializingBean{
privateStringserverAddress;
privatefinalstaticStringDELIMITER=":";
privateMaphandlerMap=newConcurrentHashMap();
privatestaticThreadPoolExecutorthreadPoolExecutor;
publicMessageRecvExecutor(StringserverAddress){
this.serverAddress=serverAddress;
}
publicstaticvoidsubmit(Runnabletask){
if(threadPoolExecutor==null){
synchronized(MessageRecvExecutor.class){
if(threadPoolExecutor==null){
threadPoolExecutor=(ThreadPoolExecutor)RpcThreadPool.getExecutor(16,-1);
}
}
}
threadPoolExecutor.submit(task);
}
publicvoidsetApplicationContext(ApplicationContextctx)throwsBeansException{
try{
MessageKeyValkeyVal=(MessageKeyVal)ctx.getBean(Class.forName("newlandframework.netty.rpc.model.MessageKeyVal"));
MaprpcServiceObject=keyVal.getMessageKeyVal();
Sets=rpcServiceObject.entrySet();
Iterator>it=s.iterator();
Map.Entryentry;
while(it.hasNext()){
entry=it.next();
handlerMap.put(entry.getKey(),entry.getValue());
}
}catch(ClassNotFoundExceptionex){
java.util.logging.Logger.getLogger(MessageRecvExecutor.class.getName()).log(Level.SEVERE,null,ex);
}
}
publicvoidafterPropertiesSet()throwsException{
//netty的线程池模型设置成主从线程池模式,这样可以应对高并发请求
//当然netty还支持单线程、多线程网络IO模型,可以根据业务需求灵活配置
ThreadFactorythreadRpcFactory=newNamedThreadFactory("NettyRPCThreadFactory");
//方法返回到Java虚拟机的可用的处理器数量
intparallel=Runtime.getRuntime().availableProcessors()2;
EventLoopGroupboss=newNioEventLoopGroup();
EventLoopGroupworker=newNioEventLoopGroup(parallel,threadRpcFactory,SelectorProvider.provider());
try{
ServerBootstrapbootstrap=newServerBootstrap();
bootstrap.group(boss,worker).channel(NioServerSocketChannel.class)
.childHandler(newMessageRecvChannelInitializer(handlerMap))
.option(ChannelOption.www.wang027.comSO_BACKLOG,128)
.childOption(ChannelOption.SO_KEEPALIVE,true);
String[]ipAddr=serverAddress.split(MessageRecvExecutor.DELIMITER);
if(ipAddr.length==2){
Stringhost=ipAddr[0];
intport=Integer.parseInt(ipAddr[1]);
ChannelFuturefuture=bootstrap.bind(host,port).sync();
System.out.printf("[authortangjie]NettyRPCServerstartsuccessip:%sport:%d\n",host,port);
future.channel().closeFuture().sync();
}else{
System.out.printf("[authortangjie]NettyRPCServerstartfail!\n");
}
}finally{
worker.shutdownGracefully();
boss.shutdownGracefully();
}
}
}
复制代码
最后还是老规矩,给出RPC服务端消息编码、解码、处理的核心模块代码实现,具体如下:
复制代码
/
@filename:MessageRecvChannelInitializer.java
NewlandCo.Ltd.Allrightsreserved.
@Description:Rpc服务端管道初始化
@authortangjie
@version1.0
/
packagenewlandframework.netty.rpc.core;
importio.netty.channel.ChannelInitializer;
importio.netty.channel.ChannelPipeline;
importio.netty.channel.socket.SocketChannel;
importio.netty.handler.codec.LengthFieldBasedFrameDecoder;
importio.netty.handler.codec.LengthFieldPrepender;
importio.netty.handler.codec.serialization.ClassResolvers;
importio.netty.handler.codec.serialization.ObjectDecoder;
importio.netty.handler.codec.serialization.ObjectEncoder;
importjava.util.Map;
publicclassMessageRecvChannelInitializerextendsChannelInitializer{
//ObjectDecoder底层默认继承半包解码器LengthFieldBasedFrameDecoder处理粘包问题的时候,
//消息头开始即为长度字段,占据4个字节。这里出于保持兼容的考虑
finalpublicstaticintMESSAGE_LENGTH=4;
privateMaphandlerMap=null;
MessageRecvChannelInitializer(MaphandlerMap){
this.handlerMap=handlerMap;
}
protectedvoidinitChannel(SocketChannelsocketChannel)throwsException{
ChannelPipelinepipeline=socketChannel.pipeline();
//ObjectDecoder的基类半包解码器LengthFieldBasedFrameDecoder的报文格式保持兼容。因为底层的父类LengthFieldBasedFrameDecoder
//的初始化参数即为super(maxObjectSize,0,4,0,4);
pipeline.addLast(newLengthFieldBasedFrameDecoder(Integer.MAX_VALUE,0,MessageRecvChannelInitializer.MESSAGE_LENGTH,0,MessageRecvChannelInitializer.MESSAGE_LENGTH));
//利用LengthFieldPrepender回填补充ObjectDecoder消息报文头
pipeline.addLast(newLengthFieldPrepender(MessageRecvChannelInitializer.MESSAGE_LENGTH));
pipeline.addLast(newObjectEncoder());
//考虑到并发性能,采用weakCachingConcurrentResolver缓存策略。一般情况使用:cacheDisabled即可
pipeline.addLast(newObjectDecoder(Integer.MAX_VALUE,ClassResolvers.weakCachingConcurrentResolver(this.getClass().getClassLoader())));
pipeline.addLast(newMessageRecvHandler(handlerMap));
}
}
复制代码
复制代码
/
@filename:MessageRecvHandler.java
NewlandCo.Ltd.Allrightsreserved.
@Description:Rpc服务器消息处理
@authortangjie
@version1.0
/
packagenewlandframework.netty.rpc.core;
importio.netty.channel.ChannelHandlerContext;
importio.netty.channel.ChannelInboundHandlerAdapter;
importjava.util.Map;
importnewlandframework.netty.rpc.model.MessageRequest;
importnewlandframework.netty.rpc.model.MessageResponse;
publicclassMessageRecvHandlerextendsChannelInboundHandlerAdapter{
privatefinalMaphandlerMap;
publicMessageRecvHandler(MaphandlerMap){
this.handlerMap=handlerMap;
}
publicvoidchannelRead(ChannelHandlerContextctx,Objectmsg)throwsException{
MessageRequestrequest=(MessageRequest)msg;
MessageResponseresponse=newMessageResponse();
MessageRecvInitializeTaskrecvTask=newMessageRecvInitializeTask(request,response,handlerMap,ctx);
//不要阻塞nio线程,复杂的业务逻辑丢给专门的线程池
MessageRecvExecutor.submit(recvTask);
}
publicvoidexceptionCaught(ChannelHandlerContextctx,Throwablecause){
//网络有异常要关闭通道
ctx.close();
}
}
复制代码
复制代码
/
@filename:MessageRecvInitializeTask.java
NewlandCo.Ltd.Allrightsreserved.
@Description:Rpc服务器消息线程任务处理
@authortangjie
@version1.0
/
packagenewlandframework.netty.rpc.core;
importio.netty.channel.ChannelFuture;
importio.netty.channel.ChannelFutureListener;
importio.netty.channel.ChannelHandlerContext;
importjava.util.Map;
importnewlandframework.netty.rpc.model.MessageRequest;
importnewlandframework.netty.rpc.model.MessageResponse;
importorg.apache.commons.beanutils.MethodUtils;
publicclassMessageRecvInitializeTaskimplementsRunnable{
privateMessageRequestrequest=null;
privateMessageResponseresponse=null;
privateMaphandlerMap=null;
privateChannelHandlerContextctx=null;
publicMessageResponsegetResponse(){
returnresponse;
}
publicMessageRequestgetRequest(){
returnrequest;
}
publicvoidsetRequest(MessageRequestrequest){
this.request=request;
}
MessageRecvInitializeTask(MessageRequestrequest,MessageResponseresponse,MaphandlerMap,ChannelHandlerContextctx){
this.request=request;
this.response=response;
this.handlerMap=handlerMap;
this.ctx=ctx;
}
publicvoidrun(){
response.setMessageId(request.getMessageId());
try{
Objectresult=reflect(request);
response.setResult(result);
}catch(Throwablet){
response.setError(t.toString());
t.printStackTrace();
System.err.printf("RPCServerinvokeerror!\n");
}
ctx.writeAndFlush(response).addListener(newChannelFutureListener(){
publicvoidoperationComplete(ChannelFuturechannelFuture)throwsException{
System.out.println("RPCServerSendmessage-idrespone:"+request.getMessageId());
}
});
}
privateObjectreflect(MessageRequestrequest)throwsThrowable{
StringclassName=request.getClassName();
ObjectserviceBean=handlerMap.get(className);
StringmethodName=request.getMethodName();
Object[]parameters=request.getParameters();
returnMethodUtils.invokeMethod(serviceBean,methodName,parameters);
}
}
复制代码
然后是RPC消息处理的回调实现模块代码
复制代码
/
@filename:MessageCallBack.java
NewlandCo.Ltd.Allrightsreserved.
@Description:Rpc消息回调
@authortangjie
@version1.0
/
packagenewlandframework.netty.rpc.core;
importjava.util.concurrent.TimeUnit;
importjava.util.concurrent.locks.Condition;
importjava.util.concurrent.locks.Lock;
importjava.util.concurrent.locks.ReentrantLock;
importnewlandframework.netty.rpc.model.MessageRequest;
importnewlandframework.netty.rpc.model.MessageResponse;
publicclassMessageCallBack{
privateMessageRequestrequest;
privateMessageResponseresponse;
privateLocklock=newReentrantLock();
privateConditionfinish=lock.newCondition();
publicMessageCallBack(MessageRequestrequest){
this.request=request;
}
publicObjectstart()throwsInterruptedException{
try{
lock.lock();
//设定一下超时时间,rpc服务器太久没有相应的话,就默认返回空吧。
finish.await(101000,TimeUnit.MILLISECONDS);
if(this.response!=null){
returnthis.response.getResult();
}else{
returnnull;
}
}finally{
lock.unlock();
}
}
publicvoidover(MessageResponsereponse){
try{
lock.lock();
finish.signal();
this.response=reponse;
}finally{
lock.unlock();
}
}
}
复制代码
到此为止,NettyRPC的关键部分:服务端、客户端的模块已经通过Netty全部实现了。现在给出spring加载配置rpc-invoke-config.xml的内容:
复制代码
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd">
复制代码
再贴出RPC服务绑定ip信息的配置文件:rpc-server.properties的内容。
#rpcserver''sipaddressconfig
rpc.server.addr=127.0.0.1:18888
最后NettyRPC服务端启动方式参考如下:
newClassPathXmlApplicationContext("newlandframework/netty/rpc/config/rpc-invoke-config.xml");
如果一切顺利,没有出现意外的话,控制台上面,会出现如下截图所示的情况:
如果出现了,说明NettyRPC服务器,已经启动成功!
上面基于Netty的RPC服务器,并发处理性能如何呢?实践是检验真理的唯一标准,下面我们就来实战一下。
下面的测试案例,是基于RPC远程调用两数相加函数,并返回计算结果。客户端同时开1W个线程,同一时刻,瞬时发起并发计算请求,然后观察Netty的RPC服务器是否有正常应答回复响应,以及客户端是否有正常返回调用计算结果。值得注意的是,测试案例是基于1W个线程瞬时并发请求而设计的,并不是1W个线程循环发起请求。这两者对于衡量RPC服务器的并发处理性能,还是有很大差别的。当然,前者对于并发性能的处理要求,要高上很多很多。
现在,先给出RPC计算接口、RPC计算接口实现类的代码实现:
复制代码
/
@filename:Calculate.java
NewlandCo.Ltd.Allrightsreserved.
@Description:计算器定义接口
@authortangjie
@version1.0
/
packagenewlandframework.netty.rpc.servicebean;
publicinterfaceCalculate{
//两数相加
intadd(inta,intb);
}
复制代码
复制代码
/
@filename:CalculateImpl.java
NewlandCo.Ltd.Allrightsreserved.
@Description:计算器定义接口实现
@authortangjie
@version1.0
/
packagenewlandframework.netty.rpc.servicebean;
publicclassCalculateImplimplementsCalculate{
//两数相加
publicintadd(inta,intb){
returna+b;
}
}
复制代码
下面是瞬时并发RPC请求的测试样例:
复制代码
/
@filename:CalcParallelRequestThread.java
NewlandCo.Ltd.Allrightsreserved.
@Description:并发线程模拟
@authortangjie
@version1.0
/
packagenewlandframework.netty.rpc.servicebean;
importnewlandframework.netty.rpc.core.MessageSendExecutor;
importjava.util.concurrent.CountDownLatch;
importjava.util.logging.Level;
importjava.util.logging.Logger;
publicclassCalcParallelRequestThreadimplementsRunnable{
privateCountDownLatchsignal;
privateCountDownLatchfinish;
privateMessageSendExecutorexecutor;
privateinttaskNumber=0;
publicCalcParallelRequestThread(MessageSendExecutorexecutor,CountDownLatchsignal,CountDownLatchfinish,inttaskNumber){
this.signal=signal;
this.finish=finish;
this.taskNumber=taskNumber;
this.executor=executor;
}
publicvoidrun(){
try{
signal.await();
Calculatecalc=executor.execute(Calculate.class);
intadd=calc.add(taskNumber,taskNumber);
System.out.println("calcaddresult:["+add+"]");
finish.countDown();
}catch(InterruptedExceptionex){
Logger.getLogger(CalcParallelRequestThread.class.getName()).log(Level.SEVERE,null,ex);
}
}
}
复制代码
复制代码
/
@filename:RpcParallelTest.java
NewlandCo.Ltd.Allrightsreserved.
@Description:rpc并发测试代码
@authortangjie
@version1.0
/
packagenewlandframework.netty.rpc.servicebean;
importjava.util.concurrent.CountDownLatch;
importnewlandframework.netty.rpc.core.MessageSendExecutor;
importorg.apache.commons.lang.time.StopWatch;
publicclassRpcParallelTest{
publicstaticvoidmain(String[]args)throwsException{
finalMessageSendExecutorexecutor=newMessageSendExecutor("127.0.0.1:18888");
//并行度10000
intparallel=10000;
//开始计时
StopWatchsw=newStopWatch();
sw.start();
CountDownLatchsignal=newCountDownLatch(1);
CountDownLatchfinish=newCountDownLatch(parallel);
for(intindex=0;index CalcParallelRequestThreadclient=newCalcParallelRequestThread(executor,signal,finish,index);
newThread(client).start();
}
//10000个并发线程瞬间发起请求操作
signal.countDown();
finish.await();
sw.stop();
Stringtip=String.format("RPC调用总共耗时:[%s]毫秒",sw.getTime());
System.out.println(tip);
executor.stop();
}
|
|