Google的Protobuf在业界非常流行,很多商业项目都选择Protobuf作为编解码框架,以下为Protobuf的一些优点:
(1)在谷歌内长期使用,产品成熟度高。
(2)跨语言,支持包括C++、Java、Python在内的多重语言。
(3)编码后的码流小,便于存储和传输。
(4)编解码性能高。
(5)支持不同协议向前兼容。
(6)支持定义可选和必选字段。
一、Protobuf开发环境搭建
1、下载Protobuf的Windows版,网址如下:https://developers.google.com/protocol-buffers/docs/downloads?hl=zh-cn,本示例基于protoc-2.6.1-win32.zip
2、下载Protobuf Java语言所需的jar包,网址如下:http://repo2./maven2/com/google/protobuf/protobuf-java/2.6.1/,本示例基于protobuf-java-2.6.1.jar。
3、新建请求响应所需的proto文件
SubscribeReq.proto
- package netty;
- option java_package = "com.serial.java.protobuf";
- option java_outer_classname = "SubscribeReqProto";
-
- message SubscribeReq{
- required int32 subReqID = 1;
- required string userName = 2;
- required string productName = 3;
- repeated string address = 4;
- }
SubscribeRespProto.proto
- package netty;
- option java_package = "com.serial.java.protobuf";
- option java_outer_classname = "SubscribeRespProto";
- message SubscribeResp{
- required int32 subReqID = 1;
- required string respCode = 2;
- required string desc = 3;
- }
4、通过Protoc.exe生成所需的Java编解码POJO文件,命令行如下。
- C:\Users\Administrator>d:
- D:\>cd "Program Files\protoc-2.6.1-win32"
- D:\Program Files\protoc-2.6.1-win32>protoc.exe --java_out=.\src .\netty\Subscrib
- eReq.proto
- D:\Program Files\protoc-2.6.1-win32>protoc.exe --java_out=.\src .\netty\Subscrib
- eResp.proto
- D:\Program Files\protoc-2.6.1-win32>
5、将生成的Java POJO文件拷贝到项目中,注意Protobuf所需的jar包也需包含在项目中,不然会报错。
6、创建测试类,测试Protobuf的编解码功能。
TestSubscribeReq.java
- package com.serial.java.test;
-
- import java.util.ArrayList;
- import java.util.List;
-
- import com.google.protobuf.InvalidProtocolBufferException;
- import com.serial.java.protobuf.SubscribeReqProto;
-
- public class TestSubscribeReq {
-
- private static byte [] encode(SubscribeReqProto.SubscribeReq req){
- return req.toByteArray();
- }
-
- private static SubscribeReqProto.SubscribeReq decode(byte [] body)
- throws InvalidProtocolBufferException{
- return SubscribeReqProto.SubscribeReq.parseFrom(body);
- }
-
- private static SubscribeReqProto.SubscribeReq createSubscribeReq(){
- SubscribeReqProto.SubscribeReq.Builder builder = SubscribeReqProto.SubscribeReq.newBuilder();
- builder.setSubReqID(1);
- builder.setUserName("leeka");
- builder.setProductName("Netty book");
-
- List<String> address = new ArrayList<String>();
- address.add("Nanjing");
- address.add("Beijing");
- address.add("Hangzhou");
- builder.addAllAddress(address);
- return builder.build();
- }
-
-
- public static void main(String[] args)throws Exception {
- SubscribeReqProto.SubscribeReq req = createSubscribeReq();
- System.out.println("before encode:"+ req.toString());
- SubscribeReqProto.SubscribeReq req2 = decode(encode(req));
- System.out.println("after encode:"+ req2.toString());
- System.out.println("Assert equal: " + req2.equals(req));
-
- }
-
- }
7、运行测试类,查看测试结果,控制台输出如下信息:
- before encode:subReqID: 1
- userName: "leeka"
- productName: "Netty book"
- address: "Nanjing"
- address: "Beijing"
- address: "Hangzhou"
-
- after encode:subReqID: 1
- userName: "leeka"
- productName: "Netty book"
- address: "Nanjing"
- address: "Beijing"
- address: "Hangzhou"
-
- Assert equal: true
二、Netty的Protobuf服务端和客户端开发
服务端入口
- package com.serial.java;
-
- import com.serial.java.protobuf.SubscribeReqProto;
-
- import io.netty.bootstrap.ServerBootstrap;
- import io.netty.channel.ChannelFuture;
- import io.netty.channel.ChannelInitializer;
- import io.netty.channel.ChannelOption;
- import io.netty.channel.EventLoopGroup;
- import io.netty.channel.nio.NioEventLoopGroup;
- import io.netty.channel.socket.SocketChannel;
- import io.netty.channel.socket.nio.NioServerSocketChannel;
- import io.netty.handler.codec.protobuf.ProtobufDecoder;
- import io.netty.handler.codec.protobuf.ProtobufEncoder;
- import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
- import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
- import io.netty.handler.logging.LogLevel;
- import io.netty.handler.logging.LoggingHandler;
-
-
- public class SubReqServer {
-
- public void bind(int port)throws Exception{
-
- //配置服务端NIO线程组
- EventLoopGroup bossGroup = new NioEventLoopGroup();
- EventLoopGroup workerGroup = new NioEventLoopGroup();
- try{
- ServerBootstrap b = new ServerBootstrap();
- b.group(bossGroup, workerGroup)
- .channel(NioServerSocketChannel.class)
- .option(ChannelOption.SO_BACKLOG, 1024)
- .handler(new LoggingHandler(LogLevel.INFO))
- .childHandler(new ChannelInitializer<SocketChannel>() {
-
- @Override
- protected void initChannel(SocketChannel ch) throws Exception {
- ch.pipeline()
- .addLast(new ProtobufVarint32FrameDecoder())
- .addLast(new ProtobufDecoder(
- SubscribeReqProto.SubscribeReq.getDefaultInstance()))
- .addLast(new ProtobufVarint32LengthFieldPrepender())
- .addLast(new ProtobufEncoder())
- .addLast(new SubReqServerHandler());
- }
-
- });
- //绑定端口,同步等待成功
- ChannelFuture f = b.bind(port).sync();
- //等待服务端监听端口关闭
- f.channel().closeFuture().sync();
-
- }finally{
- //退出时释放资源
- bossGroup.shutdownGracefully();
- workerGroup.shutdownGracefully();
- }
- }
-
- public static void main(String[] args) throws Exception{
- int port = 8085;
- if(args!=null && args.length > 0){
- port = Integer.valueOf(args[0]);
- }
- new SubReqServer().bind(port);
- }
- }
服务端处理类
- package com.serial.java;
-
- import com.serial.java.protobuf.SubscribeReqProto;
- import com.serial.java.protobuf.SubscribeRespProto;
-
- import io.netty.channel.ChannelHandlerAdapter;
- import io.netty.channel.ChannelHandlerContext;
-
- public class SubReqServerHandler extends ChannelHandlerAdapter {
-
- @Override
- public void channelRead(ChannelHandlerContext ctx, Object msg)
- throws Exception {
- SubscribeReqProto.SubscribeReq req = (SubscribeReqProto.SubscribeReq)msg;
- //System.out.println("SubReqServerHandler channelRead:"+ req.getUserName());
- if("leeka".equalsIgnoreCase(req.getUserName())){
- System.out.println("service accept client subscribe req:["+ req +"]");
- ctx.writeAndFlush(resp(req.getSubReqID()));
- }
- }
-
- private SubscribeRespProto.SubscribeResp resp(int subReqID){
- SubscribeRespProto.SubscribeResp.Builder builder = SubscribeRespProto.SubscribeResp.newBuilder();
- builder.setSubReqID(subReqID);
- builder.setRespCode("0");
- builder.setDesc("Netty book order succeed, 3 days later, sent to the designated address");
- return builder.build();
- }
-
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
- throws Exception {
- cause.printStackTrace();
- ctx.close();
- }
-
- }
客户端入口
- package com.serial.java;
-
- import com.serial.java.protobuf.SubscribeRespProto;
-
- import io.netty.bootstrap.Bootstrap;
- import io.netty.channel.ChannelFuture;
- import io.netty.channel.ChannelInitializer;
- import io.netty.channel.ChannelOption;
- import io.netty.channel.EventLoopGroup;
- import io.netty.channel.nio.NioEventLoopGroup;
- import io.netty.channel.socket.SocketChannel;
- import io.netty.channel.socket.nio.NioSocketChannel;
- import io.netty.handler.codec.protobuf.ProtobufDecoder;
- import io.netty.handler.codec.protobuf.ProtobufEncoder;
- import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
- import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
-
- public class SubReqClient {
-
- public void connect(int port,String host)throws Exception{
-
- //配置客户端NIO线程组
- EventLoopGroup group = new NioEventLoopGroup();
-
- try{
- Bootstrap b = new Bootstrap();
- b.group(group).channel(NioSocketChannel.class)
- .option(ChannelOption.TCP_NODELAY, true)
- .handler(new ChannelInitializer<SocketChannel>() {
- @Override
- protected void initChannel(SocketChannel ch) throws Exception {
- ch.pipeline()
- .addLast(new ProtobufVarint32FrameDecoder())
- .addLast(new ProtobufDecoder(
- SubscribeRespProto.SubscribeResp.getDefaultInstance()))
- .addLast(new ProtobufVarint32LengthFieldPrepender())
- .addLast(new ProtobufEncoder())
- .addLast(new SubReqClientHandler());
- };
- });
-
- //发起异步连接操作
- ChannelFuture f = b.connect(host,port).sync();
- //等待客户端链路关闭
- f.channel().closeFuture().sync();
- }finally{
- //退出,释放资源
- group.shutdownGracefully();
- }
-
- }
-
- public static void main(String[] args)throws Exception {
- int port = 8085;
- if(args!=null && args.length > 0){
- port = Integer.valueOf(args[0]);
- }
- new SubReqClient().connect(port, "127.0.0.1");
- }
- }
客户端处理类
- package com.serial.java;
-
- import java.util.ArrayList;
- import java.util.List;
- import java.util.logging.Logger;
-
- import com.serial.java.protobuf.SubscribeReqProto;
-
- import io.netty.channel.ChannelHandlerAdapter;
- import io.netty.channel.ChannelHandlerContext;
-
- public class SubReqClientHandler extends ChannelHandlerAdapter {
-
- private static final Logger logger = Logger.getLogger(SubReqClientHandler.class.getName());
-
- public SubReqClientHandler() {
-
- }
-
- @Override
- public void channelActive(ChannelHandlerContext ctx) throws Exception {
- for (int i = 0; i < 10; i++) {
- ctx.write(req(i));
- }
- ctx.flush();
- }
-
- private SubscribeReqProto.SubscribeReq req(int i){
- SubscribeReqProto.SubscribeReq.Builder r = SubscribeReqProto.SubscribeReq.newBuilder();
- r.setSubReqID(i);
- r.setProductName("Netty Book"+i);
- r.setUserName("leeka");
-
- List<String> address = new ArrayList<String>();
- address.add("Nanjing");
- address.add("Beijing");
- r.addAllAddress(address);
- return r.build();
- }
-
- @Override
- public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
- //super.channelReadComplete(ctx);
- ctx.flush();
- }
-
- @Override
- public void channelRead(ChannelHandlerContext ctx, Object msg)
- throws Exception {
- System.out.println("receive server response:["+msg+"]");
- }
-
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
- throws Exception {
- logger.warning("unexpected exception from downstream:"+ cause.getMessage());
- ctx.close();
- }
-
- }
OVER
|