分享

基于Google Protobuf的Netty编解码技术

 WindySky 2017-08-13

Google的Protobuf在业界非常流行,很多商业项目都选择Protobuf作为编解码框架,以下为Protobuf的一些优点:

    (1)在谷歌内长期使用,产品成熟度高。

    (2)跨语言,支持包括C++、Java、Python在内的多重语言。

    (3)编码后的码流小,便于存储和传输。

    (4)编解码性能高。

    (5)支持不同协议向前兼容。

    (6)支持定义可选和必选字段。

 

一、Protobuf开发环境搭建

    1、下载Protobuf的Windows版,网址如下:https://developers.google.com/protocol-buffers/docs/downloads?hl=zh-cn,本示例基于protoc-2.6.1-win32.zip

    2、下载Protobuf Java语言所需的jar包,网址如下:http://repo2./maven2/com/google/protobuf/protobuf-java/2.6.1/,本示例基于protobuf-java-2.6.1.jar

    3、新建请求响应所需的proto文件

    SubscribeReq.proto

Proto代码  收藏代码
  1. package netty;  
  2. option java_package = "com.serial.java.protobuf";  
  3. option java_outer_classname = "SubscribeReqProto";  
  4.   
  5. message SubscribeReq{  
  6.     required int32 subReqID = 1;  
  7.     required string userName = 2;  
  8.     required string productName = 3;  
  9.     repeated string address = 4;  
  10. }  

    SubscribeRespProto.proto

Proto代码  收藏代码
  1. package netty;  
  2. option java_package = "com.serial.java.protobuf";  
  3. option java_outer_classname = "SubscribeRespProto";  
  4. message SubscribeResp{  
  5.     required int32 subReqID = 1;  
  6.     required string respCode = 2;  
  7.     required string desc = 3;  
  8. }  

    4、通过Protoc.exe生成所需的Java编解码POJO文件,命令行如下。

Cmd代码  收藏代码
  1. C:\Users\Administrator>d:  
  2. D:\>cd "Program Files\protoc-2.6.1-win32"  
  3. D:\Program Files\protoc-2.6.1-win32>protoc.exe --java_out=.\src .\netty\Subscrib  
  4. eReq.proto  
  5. D:\Program Files\protoc-2.6.1-win32>protoc.exe --java_out=.\src .\netty\Subscrib  
  6. eResp.proto  
  7. D:\Program Files\protoc-2.6.1-win32>  

    5、将生成的Java POJO文件拷贝到项目中,注意Protobuf所需的jar包也需包含在项目中,不然会报错。

    6、创建测试类,测试Protobuf的编解码功能。

    TestSubscribeReq.java

Java代码  收藏代码
  1. package com.serial.java.test;  
  2.   
  3. import java.util.ArrayList;  
  4. import java.util.List;  
  5.   
  6. import com.google.protobuf.InvalidProtocolBufferException;  
  7. import com.serial.java.protobuf.SubscribeReqProto;  
  8.   
  9. public class TestSubscribeReq {  
  10.   
  11.     private static byte [] encode(SubscribeReqProto.SubscribeReq req){  
  12.         return req.toByteArray();  
  13.     }  
  14.       
  15.     private static SubscribeReqProto.SubscribeReq decode(byte [] body)   
  16.             throws InvalidProtocolBufferException{  
  17.         return SubscribeReqProto.SubscribeReq.parseFrom(body);  
  18.     }  
  19.       
  20.     private static SubscribeReqProto.SubscribeReq createSubscribeReq(){  
  21.         SubscribeReqProto.SubscribeReq.Builder builder = SubscribeReqProto.SubscribeReq.newBuilder();  
  22.         builder.setSubReqID(1);  
  23.         builder.setUserName("leeka");  
  24.         builder.setProductName("Netty book");  
  25.           
  26.         List<String> address = new ArrayList<String>();  
  27.         address.add("Nanjing");  
  28.         address.add("Beijing");  
  29.         address.add("Hangzhou");  
  30.         builder.addAllAddress(address);  
  31.         return builder.build();  
  32.     }  
  33.       
  34.       
  35.     public static void main(String[] args)throws Exception {          
  36.         SubscribeReqProto.SubscribeReq req = createSubscribeReq();  
  37.         System.out.println("before encode:"+ req.toString());         
  38.         SubscribeReqProto.SubscribeReq req2 = decode(encode(req));        
  39.         System.out.println("after encode:"+ req2.toString());         
  40.         System.out.println("Assert equal: " + req2.equals(req));  
  41.           
  42.     }  
  43.       
  44. }  

    7、运行测试类,查看测试结果,控制台输出如下信息:

Consule代码  收藏代码
  1. before encode:subReqID: 1  
  2. userName: "leeka"  
  3. productName: "Netty book"  
  4. address: "Nanjing"  
  5. address: "Beijing"  
  6. address: "Hangzhou"  
  7.   
  8. after encode:subReqID: 1  
  9. userName: "leeka"  
  10. productName: "Netty book"  
  11. address: "Nanjing"  
  12. address: "Beijing"  
  13. address: "Hangzhou"  
  14.   
  15. Assert equal: true  

 

 二、Netty的Protobuf服务端和客户端开发

     服务端入口

Java代码  收藏代码
  1. package com.serial.java;  
  2.   
  3. import com.serial.java.protobuf.SubscribeReqProto;  
  4.   
  5. import io.netty.bootstrap.ServerBootstrap;  
  6. import io.netty.channel.ChannelFuture;  
  7. import io.netty.channel.ChannelInitializer;  
  8. import io.netty.channel.ChannelOption;  
  9. import io.netty.channel.EventLoopGroup;  
  10. import io.netty.channel.nio.NioEventLoopGroup;  
  11. import io.netty.channel.socket.SocketChannel;  
  12. import io.netty.channel.socket.nio.NioServerSocketChannel;  
  13. import io.netty.handler.codec.protobuf.ProtobufDecoder;  
  14. import io.netty.handler.codec.protobuf.ProtobufEncoder;  
  15. import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;  
  16. import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;  
  17. import io.netty.handler.logging.LogLevel;  
  18. import io.netty.handler.logging.LoggingHandler;  
  19.   
  20.   
  21. public class SubReqServer {  
  22.   
  23.     public void bind(int port)throws Exception{  
  24.           
  25.         //配置服务端NIO线程组  
  26.         EventLoopGroup bossGroup = new NioEventLoopGroup();  
  27.         EventLoopGroup workerGroup = new NioEventLoopGroup();  
  28.         try{  
  29.             ServerBootstrap b = new ServerBootstrap();  
  30.             b.group(bossGroup, workerGroup)  
  31.                 .channel(NioServerSocketChannel.class)  
  32.                 .option(ChannelOption.SO_BACKLOG, 1024)  
  33.                 .handler(new LoggingHandler(LogLevel.INFO))  
  34.                 .childHandler(new ChannelInitializer<SocketChannel>() {  
  35.   
  36.                     @Override  
  37.                     protected void initChannel(SocketChannel ch) throws Exception {  
  38.                         ch.pipeline()  
  39.                         .addLast(new ProtobufVarint32FrameDecoder())                          
  40.                         .addLast(new ProtobufDecoder(  
  41.                                 SubscribeReqProto.SubscribeReq.getDefaultInstance()))                         
  42.                         .addLast(new ProtobufVarint32LengthFieldPrepender())                          
  43.                         .addLast(new ProtobufEncoder())                       
  44.                         .addLast(new SubReqServerHandler());                          
  45.                     }  
  46.                       
  47.                 });  
  48.             //绑定端口,同步等待成功  
  49.             ChannelFuture f = b.bind(port).sync();  
  50.             //等待服务端监听端口关闭  
  51.             f.channel().closeFuture().sync();  
  52.               
  53.         }finally{  
  54.             //退出时释放资源  
  55.             bossGroup.shutdownGracefully();  
  56.             workerGroup.shutdownGracefully();  
  57.         }         
  58.     }  
  59.       
  60.     public static void main(String[] args) throws Exception{  
  61.         int port = 8085;  
  62.         if(args!=null && args.length > 0){  
  63.             port = Integer.valueOf(args[0]);  
  64.         }  
  65.         new SubReqServer().bind(port);        
  66.     }  
  67. }  

 

    服务端处理类

Java代码  收藏代码
  1. package com.serial.java;  
  2.   
  3. import com.serial.java.protobuf.SubscribeReqProto;  
  4. import com.serial.java.protobuf.SubscribeRespProto;  
  5.   
  6. import io.netty.channel.ChannelHandlerAdapter;  
  7. import io.netty.channel.ChannelHandlerContext;  
  8.   
  9. public class SubReqServerHandler extends ChannelHandlerAdapter {  
  10.   
  11.     @Override  
  12.     public void channelRead(ChannelHandlerContext ctx, Object msg)  
  13.             throws Exception {  
  14.         SubscribeReqProto.SubscribeReq req = (SubscribeReqProto.SubscribeReq)msg;  
  15.         //System.out.println("SubReqServerHandler channelRead:"+ req.getUserName());  
  16.         if("leeka".equalsIgnoreCase(req.getUserName())){  
  17.             System.out.println("service accept client subscribe req:["+ req +"]");  
  18.             ctx.writeAndFlush(resp(req.getSubReqID()));       
  19.         }  
  20.     }  
  21.       
  22.     private SubscribeRespProto.SubscribeResp resp(int subReqID){  
  23.         SubscribeRespProto.SubscribeResp.Builder builder = SubscribeRespProto.SubscribeResp.newBuilder();  
  24.         builder.setSubReqID(subReqID);  
  25.         builder.setRespCode("0");  
  26.         builder.setDesc("Netty book order succeed, 3 days later, sent to the designated address");  
  27.         return builder.build();  
  28.     }  
  29.       
  30.     @Override  
  31.     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)  
  32.             throws Exception {  
  33.         cause.printStackTrace();  
  34.         ctx.close();  
  35.     }  
  36.       
  37. }  

    

    客户端入口

Java代码  收藏代码
  1. package com.serial.java;  
  2.   
  3. import com.serial.java.protobuf.SubscribeRespProto;  
  4.   
  5. import io.netty.bootstrap.Bootstrap;  
  6. import io.netty.channel.ChannelFuture;  
  7. import io.netty.channel.ChannelInitializer;  
  8. import io.netty.channel.ChannelOption;  
  9. import io.netty.channel.EventLoopGroup;  
  10. import io.netty.channel.nio.NioEventLoopGroup;  
  11. import io.netty.channel.socket.SocketChannel;  
  12. import io.netty.channel.socket.nio.NioSocketChannel;  
  13. import io.netty.handler.codec.protobuf.ProtobufDecoder;  
  14. import io.netty.handler.codec.protobuf.ProtobufEncoder;  
  15. import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;  
  16. import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;  
  17.   
  18. public class SubReqClient {  
  19.       
  20.     public void connect(int port,String host)throws Exception{  
  21.           
  22.         //配置客户端NIO线程组  
  23.         EventLoopGroup group = new NioEventLoopGroup();  
  24.           
  25.         try{  
  26.             Bootstrap b = new Bootstrap();  
  27.             b.group(group).channel(NioSocketChannel.class)  
  28.                 .option(ChannelOption.TCP_NODELAY, true)  
  29.                 .handler(new ChannelInitializer<SocketChannel>() {  
  30.                     @Override  
  31.                     protected void initChannel(SocketChannel ch) throws Exception {  
  32.                         ch.pipeline()  
  33.                         .addLast(new ProtobufVarint32FrameDecoder())                          
  34.                         .addLast(new ProtobufDecoder(  
  35.                                 SubscribeRespProto.SubscribeResp.getDefaultInstance()))                       
  36.                         .addLast(new ProtobufVarint32LengthFieldPrepender())                          
  37.                         .addLast(new ProtobufEncoder())                       
  38.                         .addLast(new SubReqClientHandler());  
  39.                     };  
  40.                 });  
  41.               
  42.             //发起异步连接操作  
  43.             ChannelFuture f = b.connect(host,port).sync();  
  44.             //等待客户端链路关闭  
  45.             f.channel().closeFuture().sync();  
  46.         }finally{  
  47.             //退出,释放资源  
  48.             group.shutdownGracefully();  
  49.         }  
  50.           
  51.     }  
  52.       
  53.     public static void main(String[] args)throws Exception {  
  54.         int port = 8085;  
  55.         if(args!=null && args.length > 0){  
  56.             port = Integer.valueOf(args[0]);  
  57.         }  
  58.         new SubReqClient().connect(port, "127.0.0.1");        
  59.     }  
  60. }  

 

    客户端处理类

Java代码  收藏代码
  1. package com.serial.java;  
  2.   
  3. import java.util.ArrayList;  
  4. import java.util.List;  
  5. import java.util.logging.Logger;  
  6.   
  7. import com.serial.java.protobuf.SubscribeReqProto;  
  8.   
  9. import io.netty.channel.ChannelHandlerAdapter;  
  10. import io.netty.channel.ChannelHandlerContext;  
  11.   
  12. public class SubReqClientHandler extends ChannelHandlerAdapter {  
  13.   
  14.     private static final Logger logger = Logger.getLogger(SubReqClientHandler.class.getName());  
  15.       
  16.     public SubReqClientHandler() {    
  17.           
  18.     }  
  19.       
  20.     @Override  
  21.     public void channelActive(ChannelHandlerContext ctx) throws Exception {  
  22.         for (int i = 0; i < 10; i++) {  
  23.             ctx.write(req(i));  
  24.         }  
  25.         ctx.flush();  
  26.     }  
  27.       
  28.     private SubscribeReqProto.SubscribeReq req(int i){  
  29.         SubscribeReqProto.SubscribeReq.Builder r = SubscribeReqProto.SubscribeReq.newBuilder();  
  30.         r.setSubReqID(i);  
  31.         r.setProductName("Netty Book"+i);  
  32.         r.setUserName("leeka");  
  33.           
  34.         List<String> address = new ArrayList<String>();  
  35.         address.add("Nanjing");  
  36.         address.add("Beijing");  
  37.         r.addAllAddress(address);         
  38.         return r.build();  
  39.     }  
  40.       
  41.     @Override  
  42.     public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {  
  43.         //super.channelReadComplete(ctx);  
  44.         ctx.flush();  
  45.     }  
  46.       
  47.     @Override  
  48.     public void channelRead(ChannelHandlerContext ctx, Object msg)  
  49.             throws Exception {  
  50.         System.out.println("receive server response:["+msg+"]");  
  51.     }  
  52.       
  53.     @Override  
  54.     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)  
  55.             throws Exception {  
  56.         logger.warning("unexpected exception from downstream:"+ cause.getMessage());  
  57.         ctx.close();  
  58.     }  
  59.       
  60. }  

 

OVER

    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多