分享

Android Netty框架的使用

 柠檬冰啡咖 2017-12-28

Netty框架的使用

1 TCP开发范例

  发送地址---192.168.31.241
  发送端口号---9223

  发送数据

复制代码
{
    "userid":"mm910@mbk.com",
    "devicetype":3,
    "accounttype":0,
    "username":"",
    "password":"e10adc3949ba59abbe56e057f20f883e",
    "meiid":1000217,
    "deviceid":"864376025909275"
}
复制代码

  接受数据

{
    "message":"登录成功",
    "sessionkey":"EF81E1BD132D40DE8F1707A521D8B5A6",
    "mainsn":"C001B00010000002",
    "code":0
}

2 上代码

1 业务层代码

复制代码
public class MainActivity extends Activity {
  
  private Base1106Entity entity1106;// 登录云棒协议
  
  public static final int RESPONSE_SUCCESS = 0x401;
  public static final int RESPONSE_FAIL = 0x402;
  public static final int RESPONSE_TIMEOUT = 0x403;
  public static final int REQUEST_HEARTBEAT_TIMEOUT = 0x410; //心跳超时
  public static final int  NOT_LOGIN= 0x411; //用户未登录
  
  public Handler mHandler = new Handler() {

    @Override
    public void handleMessage(Message msg) {
      super.handleMessage(msg);
      switch (msg.what) {
        case  RESPONSE_SUCCESS:
          IEntity entity = (IEntity) msg.obj;
          if (entity != null) {
            responseSuccess((IEntity) msg.obj);
          } else {
            responseFail(-1, "返回数据为空!");
          }
          break;
        case   RESPONSE_FAIL:// 请求失败
          if (msg != null && msg.obj != null)
            responseFail(-10001, (String) msg.obj);
          break;
        case   RESPONSE_TIMEOUT:// 请求超时
          if (msg != null && msg.obj != null)
            responseFail(-10000, (String) msg.obj);
          break;
        case   NOT_LOGIN:// 用户未登录
          if (msg != null && msg.obj != null)
            responseFail(-10002, (String) msg.obj);
          break;
      }
    }
  };
  

  @Override
  protected void onCreate(Bundle savedInstanceState) {
    super.onCreate(savedInstanceState);
    setContentView(R.layout.activity_main);
    
    Button login = (Button)findViewById(R.id.login);
    login.setOnClickListener(new View.OnClickListener() {
      
      @Override
      public void onClick(View v) {
        reqEntity1106();
      }
    });
    
    
  }

  
  public void reqEntity1106() {
    entity1106 = new Base1106Entity();
    entity1106.setMeiid(1000217);
    entity1106.setUserid("mm910@mbk.com");
    entity1106.setUsername("");
    entity1106.setPassword("e10adc3949ba59abbe56e057f20f883e");
    entity1106.setAccounttype( 0 );
    entity1106.setDevicetype(3);
    entity1106.setDeviceid("864376025909275");
    entity1106.setHandler(mHandler);
    ClientConnectFactory.getInstance().sendEntity(entity1106);
  }
  
  public void responseSuccess(IEntity entity) {
    Toast.makeText(MainActivity.this,  ((Base1106Entity)entity).toString(), Toast.LENGTH_LONG).show();
  }
  
  public void responseFail(int code, String msg) {
    Toast.makeText(MainActivity.this, msg, Toast.LENGTH_SHORT).show();
  }
  
  
}
复制代码
复制代码
public class MeiApp extends Application{
  
  public static Context mContext;
  
  @Override
  public void onCreate() {
    super.onCreate();
    mContext = this;
    
    ClientConnectFactory.getInstance().init(mContext);
  }
  
  
}
复制代码

2 业务通讯层代码

复制代码
public interface IClientConnect {

  public void isConnect(String netType);

  public void sendAgain();

  public void sendMsgFail(String netType, byte[] msg);

  public void connectFail(String netType);

  // 根据实体发送数据
  public void sendEntity(IEntity entity);

  public void sendByte(byte[] b);

  // 关闭
  public void isClose();

  // 清除当前数据
  public void isClearMsg();

  public void callBack(PackageHeader header, byte[] data, String desc, int type);

  public void callBack(IEntity entity, String desc);
}
复制代码
复制代码
public abstract class BaseClientMgr extends Subject implements IClientConnect {

  protected boolean isRunning; // 当前是否正在连接
  protected boolean isSending; // 是否正在发送 线程是否被占用
  private int mPort; // 连接服务器的端口号
  private int mCommunication; // 通讯类型
  private int heartTimeOutCount = 0; // 记录心跳超时次数
  protected int function = 1200; // 关闭连接功能号

  public static final int RESPONSE_SUCCESS = 0x401;
  public static final int RESPONSE_FAIL = 0x402;
  public static final int RESPONSE_TIMEOUT = 0x403;
  public static final int REQUEST_HEARTBEAT_TIMEOUT = 0x410; // 心跳超时
  public static final int NOT_LOGIN = 0x411; // 用户未登录

  private String mConnectKey = "BasicServicesMgr";
  private String mHost; // 连接服务器的IP地址
  protected ArrayList<IEntity> mEntityMsg = null; // 待发送消息集合

  protected Context mContext; // Context对象
  protected CommunicationThreadManager mManager; // 该通讯层管理器
  protected ParseByteThread mParseByteThread = null; // 数据解析线程
  protected ExecutorService executor; // 线程连接池

  protected BaseClientMgr(String host, int port, String key) {
    init(host, port, key);
  }

  // 初始化
  private void init(String host, int port, String key) {
    this.mContext = MeiApp.mContext;
    isRunning = false;
    isSending = false;
    mHost = host;
    mPort = port;
    mConnectKey = key;
    mEntityMsg = new ArrayList<IEntity>();
    executor = Executors.newFixedThreadPool(10);
    mParseByteThread = new ParseByteThread(this);
    executor.execute(mParseByteThread);
  }

  protected Handler basicHandler = new Handler() {

    @Override
    public void handleMessage(Message msg) {
      super.handleMessage(msg);
      switch (msg.what) {
        case ClientConstants.REQUEST:
          // 发送请求 连接占用
          if (mEntityMsg != null && mEntityMsg.size() > 0) {
            isSending = true;
            // 清除handler的消息
            basicHandler.removeMessages(ClientConstants.REQUEST);
            basicHandler.removeMessages(ClientConstants.REQUEST_CREATE_CONNECT);
            basicHandler.removeMessages(ClientConstants.REQUEST_SEND_MESSAGE);
            // 请求类型 当为网络请求时判断网络状态 建立连接
            // 检查连接是否可用
            if (isRunning) {
              // 直接发送消息
              basicHandler.removeMessages(ClientConstants.REQUEST_SEND_MESSAGE);
              basicHandler.sendEmptyMessage(ClientConstants.REQUEST_SEND_MESSAGE);
            } else {
              // 建立连接
              basicHandler.removeMessages(ClientConstants.REQUEST_CREATE_CONNECT);
              Message msgCreate = Message.obtain();
              msgCreate.what = ClientConstants.REQUEST_CREATE_CONNECT;
              msgCreate.arg1 = 0;
              basicHandler.sendMessage(msgCreate);
            }

          }
          break;
        case ClientConstants.REQUEST_CREATE_CONNECT:
          // 建立连接
          Log.i("mbk", "建立连接!");
          isConnect("netty");

          break;
        case ClientConstants.REQUEST_SEND_MESSAGE:
          // 发送消息
          Log.i("mbk", "发送消息!");
          if (isRunning) {
            if (mEntityMsg.size() > 0) {
              Log.i("mbk", "发送数据!");
              sendData(mEntityMsg.get(0));
              basicHandler.removeMessages(ClientConstants.REQUEST_TIMEOUT);
              // 设置请求超时
              basicHandler.sendEmptyMessageDelayed(ClientConstants.REQUEST_TIMEOUT, 3000);
            } else {
              Log.i("mbk", "数据发送完成!");
              isSending = false;
            }
          } else {
            // 重新建立连接
            basicHandler.removeMessages(ClientConstants.REQUEST_CREATE_CONNECT);
            basicHandler.sendEmptyMessage(ClientConstants.REQUEST_CREATE_CONNECT);
          }
          break;
        case ClientConstants.REQUEST_SEND_HEARTBEAT:
          Log.i("mbk", "发送心跳!");
          mManager.sendHeart(function);
          heartTimeOutCount++;
          Log.i("lzy02", "heartTimeOutCount---------------" + heartTimeOutCount);
          if (heartTimeOutCount >= 3) {// 大于等于3则认为与云棒无连接
            callBack(null, null, "心跳超时!", REQUEST_HEARTBEAT_TIMEOUT);
          }
          // // 发送心跳
          basicHandler.removeMessages(ClientConstants.REQUEST_SEND_HEARTBEAT);
          basicHandler.sendEmptyMessageDelayed(ClientConstants.REQUEST_SEND_HEARTBEAT, 3000);

          break;
        case ClientConstants.REQUEST_TIMEOUT:// 请求超时
          Log.i("mbk", "请求超时!");
          isRunning = false;
          callBack(null, null, "请求超时!", RESPONSE_TIMEOUT);
          break;

      }
    }
  };

  public void sendHeartbeat(int function) {
    this.function = function;
  }

  public void sendData(IEntity entity) {
    sendByte(ClientSocketUtils.sendDatas(mEntityMsg.get(0)));
  }

  // 建立连接
  @Override
  public void isConnect(String netType) {
    UdpEntity udpEntity = null;
    int type = CommunicationThreadManager.MBK_COMMUNICATION_NETTY;
    if (netType.equals("netty")) {
      // 建立一个netty连接
      type = CommunicationThreadManager.MBK_COMMUNICATION_NETTY;

      mManager = new CommunicationThreadManager(mContext, null, mConnectKey, "192.168.31.241", mPort, type, mCommunicationCallBack);

      Log.i("mbk", "发送地址---" + "192.168.31.241");
      Log.i("mbk", "发送端口号---" + mPort);

      /*
       * if (udpEntity != null) { Log.i("lzy02",
       * "udpEntity---209----------udpEntity=="+udpEntity.getYunbangIp());
       * mManager = new CommunicationThreadManager(mContext, null, mConnectKey,
       * "192.168.31.241", mPort, type, mCommunicationCallBack);
       * //Toast.makeText(mContext, "已通过Netty发送 ", Toast.LENGTH_SHORT).show();
       * Log.i("mbk","netty发送云棒IP号---" + udpEntity.getYunbangIp()); } else {
       * Log.i("lzy02", "udpEntity---211----------udpEntity == null");
       * callBack(null, null, "无法连接netty!", RESPONSE_FAIL); }
       */
      // 使用netty是时候 清理p2p
      P2pClearUp();
    } else {

    }
    Log.i("mbk", "初始化 连接服务器!" + netType);
  }

  @Override
  public void sendByte(byte[] b) {
    try {
      if (mManager != null) {
        mManager.sendDataToServer(new SendData(b));
      } else {
        isClose();
      }
    } catch (InterruptedException e) {
      isClose();
    }
  }

  // 服务端回调
  private CommunicationCallBack mCommunicationCallBack = new CommunicationCallBack() {

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
      Log.i("mbk", "--------------------------请求异常--------------------------" + mCommunication);
      isRunning = false;
      callBack(null, null, "请求异常!", RESPONSE_FAIL);

    }

    @Override
    public void connected(ChannelHandlerContext ctx) {
      Log.i("mbk", "--------------------------连接成功--------------------------" + mCommunication);
      // mChx = ctx;
      isRunning = true;
      sendAgain();
    }

    @Override
    public void connectFailure(Exception e) {
      Log.i("mbk", "--------------------------连接服务器失败--------------------------" + mCommunication);
      isRunning = false;
      callBack(null, null, "连接服务器失败!", RESPONSE_FAIL);
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, byte[] msg) {
      Log.i("mbk", "--------------------------服务端返回--------------------------" + mCommunication);
      if (mParseByteThread != null) {
        mParseByteThread.sendParseByte(msg);
      }
    }

    @Override
    public void communicationOutTime() {
      Log.i("mbk", "--------------------------连接超时--------------------------" + mCommunication);
      isRunning = false;
      callBack(null, null, "连接超时!", RESPONSE_TIMEOUT);
    }

    @Override
    public void questTimeOut() {
      Log.i("mbk", "--------------------------请求超时--------------------------" + mCommunication);
      isRunning = false;
      callBack(null, null, "请求超时!", RESPONSE_TIMEOUT);
    }
  };

  @Override
  public void sendAgain() {
    // 连接成功 发起请求
    Log.i("mbk", "连接成功,数据重新发送!");

    // basicHandler.sendEmptyMessage(ClientConstants.REQUEST_SEND_MESSAGE);
    basicHandler.sendEmptyMessageDelayed(ClientConstants.REQUEST_SEND_MESSAGE, 500);
  }

  // 接收需要发送的实体
  @Override
  public void sendEntity(IEntity entity) {
    if (mEntityMsg != null && entity != null) {
      mEntityMsg.add(entity);
      if (!isSending) {
        // 启动一个发送
        Log.i("mbk", "发起请求!REQUEST_NET");
        basicHandler.sendEmptyMessage(ClientConstants.REQUEST);
      }
    }
    // if (mEntityMsg != null && mEntityMsg.size() == 2) {
    // mEntityMsg.remove(1);
    // }

  }

  @Override
  public void callBack(PackageHeader header, byte[] data, String desc, int type) {
    basicHandler.removeMessages(ClientConstants.REQUEST_SEND_HEARTBEAT);

    switch (type) {
      case RESPONSE_SUCCESS:
        heartTimeOutCount = 0;
        basicHandler.sendEmptyMessageDelayed(ClientConstants.REQUEST_SEND_HEARTBEAT, 20000);
        switch (header.getFunction()) {
          case 9998:
            Log.i("mbk", "服务端关闭!");
            isClose();
            break;
          case 9999:
            Log.i("mbk", "成功返回一个心跳!");
            break;
          case 999:
            Log.i("mbk", "未知错误!");
            callBack(null, null, "未知错误", RESPONSE_FAIL);
            break;
          default:
            responseSuccess(header, data, desc, type);
            break;
        }
        break;
      case REQUEST_HEARTBEAT_TIMEOUT:// 心跳超时3次认为与云棒无连接
        /*
         * Intent m2Intent = new Intent(MeiConfigs.NETWORK_PROMPT);
         * m2Intent.putExtra("islogin", "3003");
         * MeiApp.mContext.sendBroadcast(m2Intent);
         */
        break;
      case RESPONSE_FAIL:
        responseFail(header, data, desc, type);
        break;
      case RESPONSE_TIMEOUT:
        responseFail(header, data, desc, type);
        break;
    }
  }

  // 请求成功
  public void responseSuccess(PackageHeader header, byte[] data, String desc, int type) {

    try {
      if (mEntityMsg.size() > 0 && mEntityMsg.get(0).getHandler() != null) {
        IEntity entity = mEntityMsg.get(0);
        if (data != null && data.length > 0) {
          entity.onDecode(new String(data, "utf-8"));
          // Log.i("mbk","云棒返回---" + "---" + new String(data, "utf-8"));
          // 请求成功

          Log.i("lzy02", "1--------------" + entity.getCode());
          Log.i("mbk", "返回一条数据!");
          Message msg = Message.obtain();
          msg.obj = entity;
          msg.arg1 = header.getFunction();
          msg.what = type;
          entity.getHandler().sendMessage(msg);
        }
      }
    } catch (Exception e) {
      e.printStackTrace();
      isClose();
    }
    if (mEntityMsg != null && mEntityMsg.size() > 0) {
      mEntityMsg.remove(0);
    }
    basicHandler.removeMessages(ClientConstants.REQUEST_TIMEOUT);
    isSending = false;
    if (mEntityMsg.size() > 0) {
      basicHandler.sendEmptyMessage(ClientConstants.REQUEST);
    }
  }

  // 请求失败
  public void responseFail(PackageHeader header, byte[] data, String desc, int type) {
    Log.i("mbk", "请求失败!   " + desc);
    Message msg = Message.obtain();
    msg.obj = desc;
    msg.arg1 = 0;
    msg.what = type;
    if (mEntityMsg.size() > 0 && mEntityMsg.get(0).getHandler() != null) {
      mEntityMsg.get(0).getHandler().sendMessage(msg);
    }
    isClose();
  }

  // 请求本地缓存返回
  @Override
  public void callBack(IEntity entity, String desc) {
    Log.i("mbk", "回一返个缓存数据!  ");
    if ("cache".equals(desc)) {
      if (entity != null && entity.getHandler() != null) {
        Message msg = Message.obtain();
        msg.obj = entity;
        msg.what = RESPONSE_SUCCESS;
        entity.getHandler().sendMessage(msg);
      }
    }
  }

  public void P2pClearUp() {
    if (mManager != null) {
      mManager.p2pCleanup();
    }
  }

  @Override
  public void isClose() {
    Log.i("mbk", "关闭连接!" + isRunning);
    if (mManager != null) {
      if (isRunning) {
        try {
          mManager.sendDataToServer(new SendData(ClientSocketUtils.sendExit(function)));
        } catch (InterruptedException e) {
        }
      } else {
        mManager.closeTheadManager();
        mManager = null;
      }
    }
    if (mParseByteThread != null)
      mParseByteThread.closeThread();
    if (mEntityMsg != null) {
      mEntityMsg.clear();
    }
    P2pClearUp();
    basicHandler.removeMessages(ClientConstants.REQUEST_SEND_HEARTBEAT);
    basicHandler.removeMessages(ClientConstants.REQUEST_TIMEOUT);
    isRunning = false;
    isSending = false;
  }

  @Override
  public void sendMsgFail(String netType, byte[] msg) {
  }

  @Override
  public void connectFail(String netType) {
  }

  @Override
  public void isClearMsg() {
    if (mEntityMsg != null) {
      mEntityMsg.clear();
    }
  }

}
复制代码
复制代码
public class BasicServicesMgr extends BaseClientMgr {

  public static BasicServicesMgr instance = null;

  public static BasicServicesMgr getInstance() {
    if (instance == null) {
      instance = new BasicServicesMgr();
    }
    return instance;
  }

  private BasicServicesMgr() {
    super( "192.168.43.1", 9223, ClientConnectorManager.BASIC_SERVICES_MGR_KEY);
  }


  //接收需要发送的实体
  @Override
  public void sendEntity(IEntity entity) {
    if (entity != null) {
      
      // 请求列表每次最多保存两个请求
      if (mEntityMsg != null && mEntityMsg.size() == 2) {
        mEntityMsg.remove(1);
      }
      mEntityMsg.add(entity);
      if (!isSending) {
        // 启动一个发送
        isSending = true;
        basicHandler.sendEmptyMessage(ClientConstants.REQUEST);
      }
      
    }
  }
}
复制代码
public interface Observer {

  //更新接口
  public void update(IEntity state);
}
复制代码
class ParseByteThread implements Runnable {

  private byte[] bufHeader = null;
  private byte[] readData = null;
  private PackageHeader header = null;
  private int headerLenth = PackageHeader.headerLenth;
  private int readDataLenth = 0;
  private int sLength = 0;// 添加到数组的长度
  private Handler fileParseHandler = null;
  private IClientConnect connect;
  
  public static final int RESPONSE_SUCCESS = 0x401;
  public static final int RESPONSE_FAIL = 0x402;
  public static final int RESPONSE_TIMEOUT = 0x403;
  /** 心跳超时  */
  public static final int REQUEST_HEARTBEAT_TIMEOUT = 0x410;
  /** 用户未登录  */
  public static final int  NOT_LOGIN= 0x411;

  public Handler getFileParseHandler() {
    return this.fileParseHandler;
  }

  public void sendParseByte(byte[] msg) {
    if (fileParseHandler != null) {
      Message msgData = Message.obtain();
      msgData.obj = msg;
      fileParseHandler.sendMessage(msgData);
    }
  }
  
  public ParseByteThread(IClientConnect connect) {
    readDataLenth = 0;
    sLength = 0;
    headerLenth = PackageHeader.headerLenth;
    bufHeader = new byte[PackageHeader.headerLenth];
    readData = null;
    header = new PackageHeader();
    this.connect = connect;
  }

  public void setFileParseHandler(Handler fileParseHandler) {
    this.fileParseHandler = fileParseHandler;
  }
  
  public void closeThread(){
    readDataLenth = 0;
    sLength = 0;
    headerLenth = PackageHeader.headerLenth;
    bufHeader = new byte[PackageHeader.headerLenth];
    readData = null;
    header = new PackageHeader();
  }
  @Override
  public void run() {

    Looper.prepare();
    fileParseHandler = new Handler() {
      public void handleMessage(Message data) {
        synchronized (data) {
          byte[] msg = (byte[]) data.obj;
          if (msg == null) {
            return;
          }
          int msgLength = msg.length;
          int useLength = 0;// 已经使用的长度
          while (msgLength - useLength > 0) {
            // 读取包头
            if (readDataLenth == 0) {
              if (msgLength - useLength >= headerLenth - sLength) {
                // 读取了一个完整的包头
                System.arraycopy(msg, useLength, bufHeader, sLength, headerLenth - sLength);
                useLength += (headerLenth - sLength);
                sLength = 0;
                header.setPackageHeader(bufHeader);
                if (header.getFunction() > 10000 || header.getFunction() < 999) {
                  // 包头不符合,跳出循环 放弃整包
                  connect.callBack(null, null, "包头不符合",  RESPONSE_FAIL);
                  break;
                }
                if (header.getFunction() != 9999 && header.getFunction() != 9998) {
                  readDataLenth = (int) header.getInclusionLenth();
                  readData = null;
                  readData = new byte[readDataLenth];
                } else if (header.getFunction() == 9999) {
                  // 发送心跳包
                  connect.callBack(header, readData, "",  RESPONSE_SUCCESS);
                } else if (header.getFunction() == 9998) {
                  msgLength = 0;
                  useLength = 0;
                  connect.callBack(header, readData, "",  RESPONSE_SUCCESS);
                }
              } else {

                System.arraycopy(msg, useLength, bufHeader, sLength, msgLength - useLength);
                sLength += (msgLength - useLength);
                break;
              }
            }
            // 读取包体
            else {
              if (msgLength - useLength >= readDataLenth - sLength) {
                // 读取了一个完整的包体
                System.arraycopy(msg, useLength, readData, sLength, readDataLenth - sLength);
                useLength += (readDataLenth - sLength);
                sLength = 0;
                readDataLenth = 0;
                bufHeader = null;
                bufHeader = new byte[PackageHeader.headerLenth];
                // 解析成功 返回数据
                try {
                  connect.callBack(header, readData, "",  RESPONSE_SUCCESS);
                } catch (Exception e) {
                  e.printStackTrace();
                }
              } else {
                System.arraycopy(msg, useLength, readData, sLength, msgLength - useLength);
                sLength += (msgLength - useLength);
                break;
              }
            }
          }
        }
      }
    };
    Looper.loop();
  }
}
复制代码
复制代码
public abstract class Subject {

  //用来保存注册的观察者对象
  private List<Observer> list = new ArrayList<Observer>();

  private Handler subHandler = new Handler(MeiApp.mContext.getMainLooper()) {
    public void handleMessage(Message msg) {
      if (list != null && list.size() > 0) {
        for (int i = 0; i < list.size(); i++) {
          list.get(i).update((IEntity) msg.obj);
        }
      }
    }
  };


  //注册观察者对象
  public void attach(Observer observer) {
    if (list != null) {
      list.add(observer);
    }
  }


  //删除观察者对象
  public void detach(Observer observer) {
    if (list != null && list.size() > 0 && observer != null) {
      list.remove(observer);

    }
  }


  //删除观察者对象
  public void clear() {

    if (list != null && list.size() > 0) {
      list.clear();
    }

  }


  //通知所有注册的观察者对象
  public void nodifyObservers(final IEntity newState) {

    new Thread(new Runnable() {

      @Override
      public void run() {
        Message msg = Message.obtain();
        msg.obj = newState;
        subHandler.sendMessage(msg);

      }
    }).start();

  }
}
复制代码

 

 

代码见https://github.com/huanyi0723/NettyTest

 

  

 

    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多