分享

netty+protobuf使用netty自带编解码器完成多种协议格式分发

 WindySky 2019-02-28

proto文件

Example.proto

package example2.proto;

message BaseData {

required Header header = 1;

extensions 100 to 99999;

}

enum Header {

//装备升级

Msg1001 = 1001;

//装备穿戴

Msg1002 = 1002;

//添加好友

Msg1003 = 1003;

}

Friend.proto

package example2.proto;

import "example2/proto/Example.proto";

extend BaseData {

optional Receive1003 receive1003 = 10031;

optional Send1003 send1003 = 10032;

}

message Receive1003 { 

  required int32 friendId = 1;

}

message Send1003 { 

  required int32 friendId = 1;

  required int32 state = 2;

}

NettyServer服务器

package example2.server;

import io.netty.bootstrap.ServerBootstrap;

import io.netty.channel.ChannelFuture;

import io.netty.channel.ChannelInitializer;

import io.netty.channel.ChannelOption;

import io.netty.channel.EventLoopGroup;

import io.netty.channel.nio.NioEventLoopGroup;

import io.netty.channel.socket.SocketChannel;

import io.netty.channel.socket.nio.NioServerSocketChannel;

import io.netty.handler.codec.protobuf.ProtobufDecoder;

import io.netty.handler.codec.protobuf.ProtobufEncoder;

import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;

import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;

import java.util.logging.Level;

import java.util.logging.Logger;

import com.google.protobuf.ExtensionRegistry;

import example2.proto.Equip;

import example2.proto.Example;

import example2.proto.Friend;

import example2.server.handler.ProtoBufServerHandler;

public class NettyServer {

private static final int PORT = 1588;

private static Logger logger = Logger.getLogger(NettyServer.class.getName());

public void start(int port) throws Exception {

EventLoopGroup bossGroup = new NioEventLoopGroup();

EventLoopGroup workerGroup = new NioEventLoopGroup();

try {

ServerBootstrap b = new ServerBootstrap();

b.group(bossGroup, workerGroup);

b.channel(NioServerSocketChannel.class);

b.option(ChannelOption.TCP_NODELAY, true);

b.option(ChannelOption.SO_BACKLOG, 1024);

b.childHandler(new ChannelInitializer<SocketChannel>() {

@Override

protected void initChannel(SocketChannel ch) throws Exception {

//decoded

ch.pipeline().addLast(new ProtobufVarint32FrameDecoder());

ExtensionRegistry registry = ExtensionRegistry.newInstance();

Equip.registerAllExtensions(registry);

Friend.registerAllExtensions(registry);

ch.pipeline().addLast(new ProtobufDecoder(Example.BaseData.getDefaultInstance(), registry));

//encoded

ch.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender());

ch.pipeline().addLast(new ProtobufEncoder());

// 注册handler

ch.pipeline().addLast(new ProtoBufServerHandler());

}

});

//绑定端口 同步等待成功

ChannelFuture f = b.bind(port).sync();

//等待服务端监听端口关闭

f.channel().closeFuture().sync();

} finally {

workerGroup.shutdownGracefully();

bossGroup.shutdownGracefully();

}

}

public static void main(String[] args) throws Exception {

logger.log(Level.INFO, "NettyServer start...");

new NettyServer().start(PORT);

}

}

ProtoBufServerHandler

package example2.server.handler;

import io.netty.channel.ChannelHandlerContext;

import io.netty.channel.SimpleChannelInboundHandler;

import java.util.HashMap;

import java.util.Map;

import example2.handlers.AbstractHandler;

import example2.handlers.EquipHandler;

import example2.handlers.FriendHandler;

import example2.proto.Example;

import example2.proto.Example.BaseData;

import example2.proto.Example.Header;

public class ProtoBufServerHandler extends SimpleChannelInboundHandler<Example.BaseData> {

//不同类型处理器,应该在服务器启动的时候就加载好对应关系

private static Map<Example.Header, AbstractHandler> headersMap;

static {

headersMap = new HashMap<Example.Header, AbstractHandler>();

headersMap.put(Header.Msg1001, new EquipHandler());

headersMap.put(Header.Msg1002, new EquipHandler());

headersMap.put(Header.Msg1003, new FriendHandler());

}

@Override

protected void messageReceived(ChannelHandlerContext ctx, BaseData baseData) throws Exception {

//需要放到单独的分发器中处理

AbstractHandler abstractHandler = headersMap.get(baseData.getHeader());

if (abstractHandler == null) {

System.err.println("没有找到消息处理器!!");

} else {

abstractHandler.handleMsg(ctx, baseData);

}

}

}

抽象处理器AbstractHandler

package example2.handlers;

import io.netty.channel.ChannelHandlerContext;

import example2.proto.Example.BaseData;

public abstract class AbstractHandler {

public void handleMsg(ChannelHandlerContext ctx, BaseData baseData) {

try {

Object object = handle(baseData);

ctx.channel().writeAndFlush(object);

} catch (Exception e) {

e.printStackTrace();

}

}

/**

* 处理消息

* @param messageVo

*/

public abstract Object handle(BaseData baseData) throws Exception;

}

FriendHandler处理器

package example2.handlers;

import com.google.protobuf.InvalidProtocolBufferException;

import example2.proto.Example;

import example2.proto.Example.BaseData;

import example2.proto.Friend;

public class FriendHandler extends AbstractHandler {

@Override

public Object handle(BaseData baseData) throws InvalidProtocolBufferException {

switch (baseData.getHeader()) {

case Msg1003:

return addFriend(baseData);

}

return null;

}

private Object addFriend(BaseData baseData) {

Friend.Receive1003 extension = baseData.getExtension(Friend.receive1003);

System.err.println("1003消息接收成功,我要返回消息了----" + extension.getFriendId());

Friend.Send1003.Builder sendMsg = Friend.Send1003.newBuilder();

sendMsg.setFriendId(extension.getFriendId()).setState(0);

Example.BaseData.Builder builder = Example.BaseData.newBuilder();

builder.setHeader(Example.Header.Msg1003);

builder.setExtension(Friend.send1003, sendMsg.build());

return builder.build();

}

}

客户端跟服务器差不多的,就不上代码了。

这样一套消息处理,如果消息很多,感觉加起来会比较麻烦,大神给指点下怎么样优化可以使功能实现更简洁,万分感谢!

--------------------- 

作者:醉从零 

来源:CSDN 

原文:https://blog.csdn.net/woshiicesky/article/details/78044535 

版权声明:本文为博主原创文章,转载请附上博文链接!

    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多