谈谈如何使用Netty开发实现高性能的RPC服务器
RPC(RemoteProcedureCallProtocol)远程过程调用协议,它是一种通过网络,从远程计算机程序上请求服务,而不必了解底层网络技术的协议。说的再直白一点,就是客户端在不必知道调用细节的前提之下,调用远程计算机上运行的某个对象,使用起来就像调用本地的对象一样。目前典型的RPC实现框架有:Thrift(facebook开源)、Dubbo(alibaba开源)等等。RPC框架针对网络协议、网络I/O模型的封装是透明的,对于调用的客户端而言,它就认为自己在调用本地的一个对象。至于传输层上,运用的是TCP协议、UDP协议、亦或是HTTP协议,一概不关心。从网络I/O模型上来看,是基于select、poll、epoll方式、还是IOCP(I/OCompletionPort)方式承载实现的,对于调用者而言也不用关心。
目前,主流的RPC框架都支持跨语言调用,即有所谓的IDL(接口定义语言),其实,这个并不是RPC所必须要求的。如果你的RPC框架没有跨语言的要求,IDL就可以不用包括了。
最后,值得一提的是,衡量一个RPC框架性能的好坏与否,RPC的网络I/O模型的选择,至关重要。在此基础上,设计出来的RPC服务器,可以考虑支持阻塞式同步IO、非阻塞式同步IO、当然还有所谓的多路复用IO模型、异步IO模型。支持不同的网络IO模型,在高并发的状态下,处理性能上会有很大的差别。还有一个衡量的标准,就是选择的传输协议。是基于TCP协议、还是HTTP协议、还是UDP协议?对性能也有一定的影响。但是从我目前了解的情况来看,大多数RPC开源实现框架都是基于TCP、或者HTTP的,目测没有采用UDP协议做为主要的传输协议的。
明白了RPC的使用原理和性能要求。现在,我们能不能撇开那些RPC开源框架,自己动手开发一个高性能的RPC服务器呢?我想,还是可以的。现在本人就使用Java,基于Netty,开发实现一个高性能的RPC服务器。
如何实现、基于什么原理?并发处理性能如何?请继续接着看下文。
我们有的时候,为了提高单个节点的通信吞吐量,提高通信性能。如果是基于Java后端的,一般首选的是NIO框架(No-blockIO)。但是问题也来了,Java的NIO掌握起来要相当的技术功底,和足够的技术积累,使用起来才能得心应手。一般的开发人员,如果要使用NIO开发一个后端的TCP/HTTP服务器,附带考虑TCP粘包、网络通信异常、消息链接处理等等网络通信细节,开发门槛太高,所以比较明智的选择是,采用业界主流的NIO框架进行服务器后端开发。主流的NIO框架主要有Netty、Mina。它们主要都是基于TCP通信,非阻塞的IO、灵活的IO线程池而设计的,应对高并发请求也是绰绰有余。随着Netty、Mina这样优秀的NIO框架,设计上日趋完善,Java后端高性能服务器开发,在技术上提供了有力的支持保障,从而打破了C++在服务器后端,一统天下的局面。因为在此之前,Java的NIO一直受人诟病,让人敬而远之!
既然,这个RPC服务器是基于Netty的,那就在说说Netty吧。实际上Netty是对JAVANIO框架的再次封装,它的开源网址是http://netty.io/,本文中使用的Netty版本是:4.0版本,可以通过http://dl.bintray.com/netty/downloads/netty-4.0.37.Final.tar.bz2,进行下载使用。那也许你会问,如何使用Netty进行RPC服务器的开发呢?实际不难,下面我就简单的说明一下技术原理:
1、定义RPC请求消息、应答消息结构,里面要包括RPC的接口定义模块、包括远程调用的类名、方法名称、参数结构、参数值等信息。
2、服务端初始化的时候通过容器加载RPC接口定义和RPC接口实现类对象的映射关系,然后等待客户端发起调用请求。
3、客户端发起的RPC消息里面包含,远程调用的类名、方法名称、参数结构、参数值等信息,通过网络,以字节流的方式送给RPC服务端,RPC服务端接收到字节流的请求之后,去对应的容器里面,查找客户端接口映射的具体实现对象。
4、RPC服务端找到实现对象的参数信息,通过反射机制创建该对象的实例,并返回调用处理结果,最后封装成RPC应答消息通知到客户端。
5、客户端通过网络,收到字节流形式的RPC应答消息,进行拆包、解析之后,显示远程调用结果。
上面说的是很简单,但是实现的时候,我们还要考虑如下的问题:
1、RPC服务器的传输层是基于TCP协议的,出现粘包咋办?这样客户端的请求,服务端不是会解析失败?好在Netty里面已经提供了解决TCP粘包问题的解码器:LengthFieldBasedFrameDecoder,可以靠它轻松搞定TCP粘包问题。
2、Netty服务端的线程模型是单线程、多线程(一个线程负责客户端连接,连接成功之后,丢给后端IO的线程池处理)、还是主从模式(客户端连接、后端IO处理都是基于线程池的实现)。当然在这里,我出于性能考虑,使用了Netty主从线程池模型。
3、Netty的IO处理线程池,如果遇到非常耗时的业务,出现阻塞了咋办?这样不是很容易把后端的NIO线程给挂死、阻塞?本文的处理方式是,对于复杂的后端业务,分派到专门的业务线程池里面,进行异步回调处理。
4、RPC消息的传输是通过字节流在NIO的通道(Channel)之间传输,那具体如何实现呢?本文,是通过基于Java原生对象序列化机制的编码、解码器(ObjectEncoder、ObjectDecoder)进行实现的。当然出于性能考虑,这个可能不是最优的方案。更优的方案是把消息的编码、解码器,搞成可以配置实现的。具体比如可以通过:protobuf、JBoss?Marshalling方式进行解码和编码,以提高网络消息的传输效率。
5、RPC服务器要考虑多线程、高并发的使用场景,所以线程安全是必须的。此外尽量不要使用synchronized进行加锁,改用轻量级的ReentrantLock方式进行代码块的条件加锁。比如本文中的RPC消息处理回调,就有这方面的使用。
6、RPC服务端的服务接口对象和服务接口实现对象要能轻易的配置,轻松进行加载、卸载。在这里,本文是通过Spring容器进行统一的对象管理。
综上所述,本文设计的RPC服务器调用的流程图如下所示:
???
客户端并发发起RPC调用请求,然后RPC服务端使用Netty连接器,分派出N个NIO连接线程,这个时候Netty连接器的任务结束。然后NIO连接线程是统一放到NettyNIO处理线程池进行管理,这个线程池里面会对具体的RPC请求连接进行消息编码、消息解码、消息处理等等一系列操作。最后进行消息处理(Handler)的时候,处于性能考虑,这里的设计是,直接把复杂的消息处理过程,丢给专门的RPC业务处理线程池集中处理,然后Handler对应的NIO线程就立即返回、不会阻塞。这个时候RPC调用结束,客户端会异步等待服务端消息的处理结果,本文是通过消息回调机制实现(MessageCallBack)。
再来说一说Netty对于RPC消息的解码、编码、处理对应的模块和流程,具体如下图所示:
??
从上图可以看出客户端、服务端对RPC消息编码、解码、处理调用的模块以及调用顺序了。Netty就是把这样一个一个的处理器串在一起,形成一个责任链,统一进行调用。
说了这么多,现在先简单看下,我设计实现的NettyRPC的代码目录层级结构:
???
其中newlandframework.netty.rpc.core包是NettyRPC的核心实现。newlandframework.netty.rpc.model包里面,则封装了RPC消息请求、应答报文结构,以及RPC服务接口与实现绑定关系的容器定义。newlandframework.netty.rpc.config里面定义了NettyRPC的服务端文件配置属性。
下面先来看下newlandframework.netty.rpc.model包中定义的内容。具体是RPC消息请求、应答消息的结构定义:
RPC请求消息结构
RPC应答消息结构
RPC服务接口定义、服务接口实现绑定关系容器定义,提供给spring作为容器使用。
好了,定义好核心模型结构之后,现在再向大家展示一下NettyRPC核心包:newlandframework.netty.rpc.core的关键部分实现代码,首先是业务线程池相关类的实现代码,具体如下:
线程工厂定义实现
业务线程池定义实现
/
@filename:AbortPolicyWithReport.java
NewlandCo.Ltd.Allrightsreserved.
@Description:线程池异常策略
@authortangjie
@version1.0
/packagenewlandframework.netty.rpc.core;
importjava.util.concurrent.RejectedExecutionException;importjava.util.concurrent.ThreadPoolExecutor;
publicclassAbortPolicyWithReportextendsThreadPoolExecutor.AbortPolicy{
privatefinalStringthreadName;
publicAbortPolicyWithReport(StringthreadName){
this.threadName=threadName;
}
publicvoidrejectedExecution(Runnabler,ThreadPoolExecutore){
Stringmsg=String.format("RpcServer["
+"ThreadName:%s,PoolSize:%d(active:%d,core:%d,max:%d,largest:%d),Task:%d(completed:%d),"
+"Executorstatus:(isShutdown:%s,isTerminated:%s,isTerminating:%s)]",
threadName,e.getPoolSize(),e.getActiveCount(),e.getCorePoolSize(),e.getMaximumPoolSize(),e.getLargestPoolSize(),
e.getTaskCount(),e.getCompletedTaskCount(),e.isShutdown(),e.isTerminated(),e.isTerminating());
System.out.println(msg);
thrownewRejectedExecutionException(msg);
}
}
RPC调用客户端定义实现
这里的RPC客户端实际上,是动态代理了MessageSendProxy,当然这里是应用了,JDK原生的动态代理实现,你还可以改成CGLIB(CodeGenerationLibrary)方式。不过本人测试了一下CGLIB方式,在高并发的情况下面会出现空指针异常,但是同样的情况,JDK原生的动态代理却没有问题。并发程度不高的情况下面,两种代理方式都运行正常。后续再深入研究看看吧!废话不说了,现在给出MessageSendProxy的实现方式
进一步发现MessageSendProxy其实是把消息发送给RpcServerLoader模块,它的代码如下:
好了,现在一次性给出RPC客户端消息编码、解码、处理的模块实现代码。
/
@filename:MessageSendChannelInitializer.java
NewlandCo.Ltd.Allrightsreserved.
@Description:Rpc客户端管道初始化
@authortangjie
@version1.0
/packagenewlandframework.netty.rpc.core;
importio.netty.channel.ChannelInitializer;importio.netty.channel.ChannelPipeline;importio.netty.channel.socket.SocketChannel;importio.netty.handler.codec.LengthFieldBasedFrameDecoder;importio.netty.handler.codec.LengthFieldPrepender;importio.netty.handler.codec.serialization.ClassResolvers;importio.netty.handler.codec.serialization.ObjectDecoder;importio.netty.handler.codec.serialization.ObjectEncoder;
publicclassMessageSendChannelInitializerextendsChannelInitializer{
//ObjectDecoder底层默认继承半包解码器LengthFieldBasedFrameDecoder处理粘包问题的时候,
//消息头开始即为长度字段,占据4个字节。这里出于保持兼容的考虑
finalpublicstaticintMESSAGE_LENGTH=4;
protectedvoidinitChannel(SocketChannelsocketChannel)throwsException{
ChannelPipelinepipeline=socketChannel.pipeline();
//ObjectDecoder的基类半包解码器LengthFieldBasedFrameDecoder的报文格式保持兼容。因为底层的父类LengthFieldBasedFrameDecoder
//的初始化参数即为super(maxObjectSize,0,4,0,4);
pipeline.addLast(newLengthFieldBasedFrameDecoder(Integer.MAX_VALUE,0,MessageSendChannelInitializer.MESSAGE_LENGTH,0,MessageSendChannelInitializer.MESSAGE_LENGTH));
//利用LengthFieldPrepender回填补充ObjectDecoder消息报文头
pipeline.addLast(newLengthFieldPrepender(MessageSendChannelInitializer.MESSAGE_LENGTH));
pipeline.addLast(newObjectEncoder());
//考虑到并发性能,采用weakCachingConcurrentResolver缓存策略。一般情况使用:cacheDisabled即可
pipeline.addLast(newObjectDecoder(Integer.MAX_VALUE,ClassResolvers.weakCachingConcurrentResolver(this.getClass().getClassLoader())));
pipeline.addLast(newMessageSendHandler());
}
}
/
@filename:MessageSendHandler.java
NewlandCo.Ltd.Allrightsreserved.
@Description:Rpc客户端处理模块
@authortangjie
@version1.0
/packagenewlandframework.netty.rpc.core;
importio.netty.buffer.Unpooled;importio.netty.channel.Channel;importio.netty.channel.ChannelFutureListener;importio.netty.channel.ChannelHandlerContext;importio.netty.channel.ChannelInboundHandlerAdapter;importjava.net.Socketwww.wang027.comAddress;importjava.util.concurrent.ConcurrentHashMap;importnewlandframework.netty.rpc.model.MessageRequest;importnewlandframework.netty.rpc.model.MessageResponse;
publicclassMessageSendHandlerextendsChannelInboundHandlerAdapter{
privateConcurrentHashMapmapCallBack=newConcurrentHashMap();
privatevolatileChannelchannel;
privateSocketAddressremoteAddr;
publicChannelgetChannel(){
returnchannel;
}
publicSocketAddressgetRemoteAddr(){
returnremoteAddr;
}
publicvoidchannelActive(ChannelHandlerContextctx)throwsException{
super.channelActive(ctx);
this.remoteAddr=this.channel.remoteAddress();
}
publicvoidchannelRegistered(ChannelHandlerContextctx)throwsException{
super.channelRegistered(ctx);
this.channel=ctx.channel();
}
publicvoidchannelRead(ChannelHandlerContextctx,Objectmsg)throwsException{
MessageResponseresponse=(MessageResponse)msg;
StringmessageId=response.getMessageId();
MessageCallBackcallBack=mapCallBack.get(messageId);
if(callBack!=null){
mapCallBack.remove(messageId);
callBack.over(response);
}
}
publicvoidexceptionCaught(ChannelHandlerContextctx,Throwablecause)throwsException{
ctx.close();
}
publicvoidclose(){
channel.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
}
publicMessageCallBacksendRequest(MessageRequestrequest){
MessageCallBackcallBack=newMessageCallBack(request);
mapCallBack.put(request.getMessageId(),callBack);
channel.writeAndFlush(request);
returncallBack;
}
}
最后给出RPC服务端的实现。首先是通过spring自动加载RPC服务接口、接口实现容器绑定加载,初始化Netty主/从线程池等操作,具体是通过MessageRecvExecutor模块实现的,现在给出实现代码:
最后还是老规矩,给出RPC服务端消息编码、解码、处理的核心模块代码实现,具体如下:
/
@filename:MessageRecvHandler.java
NewlandCo.Ltd.Allrightsreserved.
@Description:Rpc服务器消息处理
@authortangjie
@version1.0
/packagenewlandframework.netty.rpc.core;
importio.netty.channel.ChannelHandlerContext;importio.netty.channel.ChannelInboundHandlerAdapter;importjava.util.Map;importnewlandframework.netty.rpc.model.MessageRequest;importnewlandframework.netty.rpc.model.MessageResponse;
publicclassMessageRecvHandlerextendsChannelInboundHandlerAdapter{
privatefinalMaphandlerMap;
publicMessageRecvHandler(MaphandlerMap){
this.handlerMap=handlerMap;
}
publicvoidchannelRead(ChannelHandlerContextctx,Objectmsg)throwsException{
MessageRequestrequest=(MessageRequest)msg;
MessageResponseresponse=newMessageResponse();
MessageRecvInitializeTaskrecvTask=newMessageRecvInitializeTask(request,response,handlerMap,ctx);
//不要阻塞nio线程,复杂的业务逻辑丢给专门的线程池MessageRecvExecutor.submit(recvTask);
}
publicvoidexceptionCaught(ChannelHandlerContextctx,Throwablecause){
//网络有异常要关闭通道ctx.close();
}
}
/
@filename:MessageRecvInitializeTask.java
NewlandCo.Ltd.Allrightsreserved.
@Description:Rpc服务器消息线程任务处理
@authortangjie
@version1.0
/packagenewlandframework.netty.rpc.core;
importio.netty.channel.ChannelFuture;importio.netty.channel.ChannelFutureListener;importio.netty.channel.ChannelHandlerContext;importjava.util.Map;importnewlandframework.netty.rpc.model.MessageRequest;importnewlandframework.netty.rpc.model.MessageResponse;importorg.apache.commons.beanutils.MethodUtils;
publicclassMessageRecvInitializeTaskimplementsRunnable{
privateMessageRequestrequest=null;
privateMessageResponseresponse=null;
privateMaphandlerMap=null;
privateChannelHandlerContextctx=null;
publicMessageResponsegetResponse(){
returnresponse;
}
publicMessageRequestgetRequest(){
returnrequest;
}
publicvoidsetRequest(MessageRequestrequest){
this.request=request;
}
MessageRecvInitializeTask(MessageRequestrequest,MessageResponseresponse,MaphandlerMap,ChannelHandlerContextctx){
this.request=request;
this.response=response;
this.handlerMap=handlerMap;
this.ctx=ctx;
}
publicvoidrun(){
response.setMessageId(request.getMessageId());
try{
Objectresult=reflect(request);
response.setResult(result);
}catch(Throwablet){
response.setError(t.toString());
t.printStackTrace();
System.err.printf("RPCServerinvokeerror!\n");
}
ctx.writeAndFlush(response).addListener(newChannelFutureListener(){
publicvoidoperationComplete(ChannelFuturechannelFuture)throwsException{
System.out.println("RPCServerSendmessage-idrespone:"+request.getMessageId());
}
});
}
privateObjectreflect(MessageRequestrequest)throwsThrowable{
StringclassName=request.getClassName();
ObjectserviceBean=handlerMap.get(className);
StringmethodName=request.getMethodName();
Object[]parameters=request.getParameters();
returnMethodUtils.invokeMethod(serviceBean,methodName,parameters);
}
}
然后是RPC消息处理的回调实现模块代码
到此为止,NettyRPC的关键部分:服务端、客户端的模块已经通过Netty全部实现了。现在给出spring加载配置rpc-invoke-config.xml的内容:
再贴出RPC服务绑定ip信息的配置文件:rpc-server.properties的内容。
最后NettyRPC服务端启动方式参考如下:
如果一切顺利,没有出现意外的话,控制台上面,会出现如下截图所示的情况:
如果出现了,说明NettyRPC服务器,已经启动成功!
上面基于Netty的RPC服务器,并发处理性能如何呢?实践是检验真理的唯一标准,下面我们就来实战一下。
下面的测试案例,是基于RPC远程调用两数相加函数,并返回计算结果。客户端同时开1W个线程,同一时刻,瞬时发起并发计算请求,然后观察Netty的RPC服务器是否有正常应答回复响应,以及客户端是否有正常返回调用计算结果。值得注意的是,测试案例是基于1W个线程瞬时并发请求而设计的,并不是1W个线程循环发起请求。这两者对于衡量RPC服务器的并发处理性能,还是有很大差别的。当然,前者对于并发性能的处理要求,要高上很多很多。
现在,先给出RPC计算接口、RPC计算接口实现类的代码实现:
/
@filename:CalculateImpl.java
NewlandCo.Ltd.Allrightsreserved.
@Description:计算器定义接口实现
@authortangjie
@version1.0
/packagenewlandframework.netty.rpc.servicebean;
publicclassCalculateImplimplementsCalculate{
//两数相加
publicintadd(inta,intb){
returna+b;
}
}
下面是瞬时并发RPC请求的测试样例:
/
@filename:RpcParallelTest.java
NewlandCo.Ltd.Allrightsreserved.
@Description:rpc并发测试代码
@authortangjie
@version1.0
/packagenewlandframework.netty.rpc.servicebean;
importjava.util.concurrent.CountDownLatch;importnewlandframework.netty.rpc.core.MessageSendExecutor;importorg.apache.commons.lang.time.StopWatch;
publicclassRpcParallelTest{
publicstaticvoidmain(String[]args)throwsException{
finalMessageSendExecutorexecutor=newMessageSendExecutor("127.0.0.1:18888");
//并行度10000
intparallel=10000;
//开始计时
StopWatchsw=newStopWatch();
sw.start();
CountDownLatchsignal=newCountDownLatch(1);
CountDownLatchfinish=newCountDownLatch(parallel);
for(intindex=0;index CalcParallelRequestThreadclient=newCalcParallelRequestThread(executor,signal,finish,index);
newThread(client).start();
}
//10000个并发线程瞬间发起请求操作signal.countDown();
finish.await();
sw.stop();
Stringtip=String.format("RPC调用总共耗时:[%s]毫秒",sw.getTime());
System.out.println(tip);
executor.stop();
}
}
好了,现在先启动NettyRPC服务器,确认没有问题之后,运行并发RPC请求客户端,看下客户端打印的计算结果,以及处理耗时。
???
从上面来看,10000个瞬时RPC计算请求,总共耗时接近11秒。我们在来看下NettyRPC的服务端运行情况,如下所示:
???
可以很清楚地看到,RPC服务端都有收到客户端发起的RPC计算请求,并返回消息应答。
最后我们还是要分别验证一下,RPC服务端是否存在丢包、粘包、IO阻塞的情况?1W个并发计算请求,是否成功接收处理并应答了?实际情况说明一切,看下图所示:
????
?非常给力,RPC的服务端确实成功接收到了客户端发起的1W笔瞬时并发计算请求,并且成功应答处理了。并没有出现:丢包、粘包、IO阻塞的情况。再看下RPC客户端,是否成功得到计算结果的应答返回了呢?
很好,RPC的客户端,确实收到了RPC服务端计算的1W笔加法请求的计算结果,而且耗时接近11秒。由此可见,基于Netty+业务线程池的NettyRPC服务器,应对并发多线程RPC请求,处理起来是得心应手,游刃有余!
最后,本文通过Netty这个NIO框架,实现了一个很简单的“高性能”的RPC服务器,代码虽然写出来了,但是还是有一些值得改进的地方,比如:
1、对象序列化传输可以支持目前主流的序列化框架:protobuf、JBoss?Marshalling、Avro等等。
2、Netty的线程模型可以根据业务需求,进行定制。因为,并不是每笔业务都需要这么强大的并发处理性能。
3、目前RPC计算只支持一个RPC服务接口映射绑定一个对应的实现,后续要支持一对多的情况。
4、业务线程池的启动参数、线程池并发阻塞容器模型等等,可以配置化管理。
5、Netty的Handler处理部分,对于复杂的业务逻辑,现在是统一分派到特定的线程池进行后台异步处理。当然你还可以考虑JMS(消息队列)方式进行解耦,统一分派给消息队列的订阅者,统一处理。目前实现JMS的开源框架也有很多,ActiveMQ、RocketMQ等等,都可以考虑。
本文实现的NettyRPC,对于面前的您而言,一定还有很多地方,可以加以完善和改进,优化改进的工作就交给您自由发挥了。
由于本人技术能力、认知水平有限。本文中有说不对的地方,恳请园友们批评指正!不吝赐教!最后,感谢面前的您,耐心的阅读完本文,相信现在的你,对于Java开发高性能的服务端应用,又有了一个更深入的了解!本文算是对我Netty学习成果的阶段性总结,后续有时间,我还会继续推出Netty工业级开发的相关文章,敬请期待!
PS:还有兴趣的朋友可以参考、阅读一下,我的另外一篇文章:Netty实现高性能RPC服务器优化篇之消息序列化。此外,自从在博客园发表了两篇:基于Netty开发高性能RPC服务器的文章之后,本人收到很多园友们索要源代码进行学习交流的请求。为了方便大家,本人把NettyRPC的代码开源托管到github上面,欢迎有兴趣的朋友一起学习、研究!
|
|