分享

netty + protobuf 传输多个类

 WindySky 2019-02-28

在protobuf序列化的前面,加上一个自定义的头,这个头包含序列化的长度和它的类型。在解压的时候根据包头来反序列化。

假设socket上要传输2个类型的数据,股票行情信息和期权行情信息:

股票的.proto定义:

syntax = "proto3";

package test.model.protobuf;

option java_package = "test.model.protobuf";

message StockTick {

    string stockId = 1;

    int price = 2;

}

期权的.proto定义:

syntax = "proto3";

package test.model.protobuf;

option java_package = "test.model.protobuf";

message OptionTick {

    string optionId = 1;

    string securityId = 2;

    int price = 3;

}

netty4官方事实上已经实现了protobuf的编解码的插件,但是只能用于传输单一类型的protobuf序列化。我这里截取一段netty代码,熟悉netty的同学马上就能理解它的作用:

        @Override

        protected void initChannel(SocketChannel ch) throws Exception {

            ChannelPipeline pipeline = ch.pipeline();

            pipeline.addLast(new ProtobufVarint32FrameDecoder());

            pipeline.addLast(new ProtobufDecoder(StockTickOuterClass.StockTick.getDefaultInstance()));

            pipeline.addLast(new ProtobufVarint32LengthFieldPrepender());

            pipeline.addLast(new ProtobufEncoder());

            pipeline.addLast(new CustomProtoServerHandler());

        }

看以上代码高亮部分,netty4官方的编解码器必须指定单一的protobuf类型才行。具体每个类的作用:

ProtobufEncoder:用于对Probuf类型序列化。

ProtobufVarint32LengthFieldPrepender:用于在序列化的字节数组前加上一个简单的包头,只包含序列化的字节长度。

ProtobufVarint32FrameDecoder:用于decode前解决半包和粘包问题(利用包头中的包含数组长度来识别半包粘包)

ProtobufDecoder:反序列化指定的Probuf字节数组为protobuf类型。

我们可以参考以上官方的编解码代码,将实现我们客户化的protobuf编解码插件,但是要支持多种不同类型protobuf数据在一个socket上传输:

编码器CustomProtobufEncoder:

import com.google.protobuf.MessageLite;

import io.netty.buffer.ByteBuf;

import io.netty.channel.ChannelHandler.Sharable;

import io.netty.channel.ChannelHandlerContext;

import io.netty.handler.codec.MessageToByteEncoder;

/**

 * 参考ProtobufVarint32LengthFieldPrepender 和 ProtobufEncoder

 */

@Sharable

public class CustomProtobufEncoder extends MessageToByteEncoder<MessageLite> {

HangqingEncoder hangqingEncoder;

    public CustomProtobufEncoder(HangqingEncoder hangqingEncoder)

    {

        this.hangqingEncoder = hangqingEncoder;

    }

    @Override

    protected void encode(

            ChannelHandlerContext ctx, MessageLite msg, ByteBuf out) throws Exception {

        byte[] body = msg.toByteArray();

        byte[] header = encodeHeader(msg, (short)body.length);

        out.writeBytes(header);

        out.writeBytes(body);

        return;

    }

    private byte[] encodeHeader(MessageLite msg, short bodyLength) {

        byte messageType = 0x0f;

        if (msg instanceof StockTickOuterClass.StockTick) {

            messageType = 0x00;

        } else if (msg instanceof OptionTickOuterClass.OptionTick) {

            messageType = 0x01;

        }

        byte[] header = new byte[4];

        header[0] = (byte) (bodyLength & 0xff);

        header[1] = (byte) ((bodyLength >> 8) & 0xff);

        header[2] = 0; // 保留字段

        header[3] = messageType;

        return header;

    }

}

CustomProtobufEncoder序列化传入的protobuf类型,并且为它创建了一个4个字节的包头,格式如下

body长度(low) body长度

(high) 保留字节 类型

其中的encodeHeader方法具体的实现要根据你要传输哪些protobuf类型来修改代码,也可以稍加设计避免使用太多的if…else。

解码器CustomProtobufDecoder:

import io.netty.buffer.ByteBuf;

import io.netty.channel.ChannelHandlerContext;

import io.netty.handler.codec.ByteToMessageDecoder;

import java.util.List;

import com.google.protobuf.MessageLite;

/**

 * 参考ProtobufVarint32FrameDecoder 和 ProtobufDecoder

 */

public class CustomProtobufDecoder extends ByteToMessageDecoder {

    @Override

    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {

        while (in.readableBytes() > 4) { // 如果可读长度小于包头长度,退出。

            in.markReaderIndex();

            // 获取包头中的body长度

            byte low = in.readByte();

            byte high = in.readByte();

            short s0 = (short) (low & 0xff);

            short s1 = (short) (high & 0xff);

            s1 <<= 8;

            short length = (short) (s0 | s1);

            // 获取包头中的protobuf类型

            in.readByte();

            byte dataType = in.readByte();

            // 如果可读长度小于body长度,恢复读指针,退出。

            if (in.readableBytes() < length) {

                in.resetReaderIndex();

                return;

            }

            // 读取body

            ByteBuf bodyByteBuf = in.readBytes(length);

            byte[] array;

            int offset;

            int readableLen= bodyByteBuf.readableBytes();

            if (bodyByteBuf.hasArray()) {

                array = bodyByteBuf.array();

                offset = bodyByteBuf.arrayOffset() + bodyByteBuf.readerIndex();

            } else {

                array = new byte[readableLen];

                bodyByteBuf.getBytes(bodyByteBuf.readerIndex(), array, 0, readableLen);

                offset = 0;

            }

            //反序列化

            MessageLite result = decodeBody(dataType, array, offset, readableLen);

            out.add(result);

        }

    }

    public MessageLite decodeBody(byte dataType, byte[] array, int offset, int length) throws Exception {

        if (dataType == 0x00) {

            return StockTickOuterClass.StockTick.getDefaultInstance().

                    getParserForType().parseFrom(array, offset, length);

        } else if (dataType == 0x01) {

            return OptionTickOuterClass.OptionTick.getDefaultInstance().

                    getParserForType().parseFrom(array, offset, length);

        }

        return null; // or throw exception

    }

}

CustomProtobufDecoder实现了2个功能,1)通过包头中的长度信息来解决半包和粘包。 2)把消息body反序列化为对应的protobuf类型(根据包头中的类型信息)。

其中的decodeBody方法具体的实现要根据你要传输哪些protobuf类型来修改代码,也可以稍加设计避免使用太多的if…else。

在Netty服务器上应用编解码器

如何把我们自定义的编解码用于netty Server:

        @Override

        protected void initChannel(SocketChannel ch) throws Exception {

            ChannelPipeline pipeline = ch.pipeline();

            pipeline.addLast("decoder",new CustomProtobufDecoder());

            pipeline.addLast("encoder",new CustomProtobufEncoder());

            pipeline.addLast(new CustomProtoServerHandler());

        }

    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多