分享

UDP实现可靠传输的简单实例

 筑心wup 2013-06-02

UDP实现可靠传输的简单实例
核心思想:模拟TCP的传输过程


import java.net.*;
import java.io.*;

/**
* 发送消息的结构定义
* @author Administrator
*
*/
public class NetJavaMsg {
     
      private int totalLen; //数据的长度
      private int id;  //唯一的ID
      private byte[] data; //消息内容
     
      //本地参数,为简化起见,不发送
      private SocketAddress recvRespAdd;  //发送者接收应答的地址
      private SocketAddress destAdd;  //接收者地址
     
      private int sendCount=0; //发送次数
      private long lastSendTime; //最后一次发送的时间
     
      /**
        *
        * @param id  //唯一的序号
        * @param data  //数据内容
        */
      public NetJavaMsg(int id,byte[] data){
              this.id=id;
              this.data=data;
              totalLen=4+4+data.length;
      }
     
     
      /**
        *
        * @param udpData  //将受到的udp数据解析为NetJavaMsg对象
        */
      public NetJavaMsg(byte[] udpData){
              try{
                    ByteArrayInputStream bins=new ByteArrayInputStream(udpData);
                    DataInputStream dins=new DataInputStream(bins);
                   
                    this.totalLen=dins.readInt();
                    this.id=dins.readInt();
                   
                    this.data=new byte[totalLen-4-4];
                    dins.readFully(data);
                   
                   
              }catch(Exception e){
                    e.printStackTrace();
              }
             
      }
     
     
      public byte[] toByte(){
              try{
                    ByteArrayOutputStream bous=new ByteArrayOutputStream();
                    DataOutputStream dous=new DataOutputStream(bous);
                    dous.writeInt(totalLen);
                    dous.writeInt(id);
                    dous.write(data);
                    dous.flush();
                   
                    return bous.toByteArray();
                   
              }catch(Exception e){
                    e.printStackTrace();
              }
             
              return null;
      }

      @Override
      public String toString() {
              // TODO Auto-generated method stub
              return "id:"+id+"  content"+new String(data)+"  totalLen"+totalLen+"
sengerAdd:"+recvRespAdd+"  destAdd:"+destAdd;

      }


      public int getTotalLen() {
              return totalLen;
      }

      public void setTotalLen(int totalLen) {
              this.totalLen = totalLen;
      }

      public int getId() {
              return id;
      }

      public void setId(int id) {
              this.id = id;
      }

      public byte[] getData() {
              return data;
      }

      public void setData(byte[] data) {
              this.data = data;
      }

      public SocketAddress getRecvRespAdd() {
              return recvRespAdd;
      }

      public void setRecvRespAdd(SocketAddress recvRespAdd) {
              this.recvRespAdd = recvRespAdd;
      }

      public SocketAddress getDestAdd() {
              return destAdd;
      }

      public void setDestAdd(SocketAddress destAdd) {
              this.destAdd = destAdd;
      }

      public int getSendCount() {
              return sendCount;
      }

      public void setSendCount(int sendCount) {
              this.sendCount = sendCount;
      }

      public long getLastSendTime() {
              return lastSendTime;
      }

      public void setLastSendTime(long lastSendTime) {
              this.lastSendTime = lastSendTime;
      }
     
     
     
     
     
     
     

}




import java.io.*;

public class NetJavaRespMsg {

   

    private int totalLen;

    private int repId;  //回复对应接收到的消息ID

    private byte state=0; //状态 0:正确接收          其它:错误

    private long resTime; //应答方的发送时间

   

    public NetJavaRespMsg(int repId,byte state,long resTime){

      this.repId=repId;

      this.state=state;

      this.resTime=resTime;

      totalLen=4+4+1+8;

     

    }

   

    public NetJavaRespMsg(byte[] udpData){

      try{

          ByteArrayInputStream bins=new ByteArrayInputStream(udpData);

          DataInputStream dins=new DataInputStream(bins);

         

         

          this.totalLen=dins.readInt();

          this.repId=dins.readInt();

          this.state=dins.readByte();

          this.resTime=dins.readLong();

         

         

         

      }catch(Exception e){

          e.printStackTrace();

      }

    }

   

   

    public byte[] toByte(){

      try{

          ByteArrayOutputStream bous=new ByteArrayOutputStream();

          DataOutputStream dous=new DataOutputStream(bous);

          dous.writeInt(this.totalLen);

          dous.writeInt(this.repId);

          dous.writeByte(this.state);

          dous.writeLong(this.resTime);

          dous.flush();

         

          return bous.toByteArray();

         

         

      }catch(Exception e){

          e.printStackTrace();

      }

      return null;

    }

   

   

    @Override

    public String toString() {

      // TODO Auto-generated method stub

      return "totalLen:"+this.totalLen+"  respID"+this.repId+"  state"+this.state+"  resTime"+resTime;

     

    }

    public int getTotalLen() {

      return totalLen;

    }

    public void setTotalLen(int totalLen) {

      this.totalLen = totalLen;

    }

    public int getRepId() {

      return repId;

    }

    public void setRepId(int repId) {

      this.repId = repId;

    }

    public byte getState() {

      return state;

    }

    public void setState(byte state) {

      this.state = state;

    }

    public long getResTime() {

      return resTime;

    }

    public void setResTime(long resTime) {

      this.resTime = resTime;

    }

   

   

   

}







import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.*;
import java.util.concurrent.*;

/**
* 数据报发送 
* 1.发送消息线程负责发送,发送后将消息放入容器中等待应答
* 2.接收线程接收应答,从容器中匹配后删除
* 3.重发线程负责重发,未收到应答的消息,发送3次后移除
* @author Administrator
*
*/
public class DatagramSend {

      private SocketAddress localAddr; //本地要发送的地址对象
      private DatagramSocket dSender; //发送的Socket对象
      private SocketAddress destAddr; //目标地址
     
      //本地缓存已发送的消息Map  key为消息ID  value为消息对象本身
      Map<Integer,NetJavaMsg> msgQueue=new ConcurrentHashMap();
     
     
      public DatagramSend() throws Exception{
              localAddr=new InetSocketAddress("172.16.1.151",13000);
              dSender=new DatagramSocket(localAddr);
              destAddr=new InetSocketAddress("172.16.1.151",14000);
             
              //启动三个线程
              startSendThread();
              startRecvResponseThread();
              startReSendThread();
             
             
      }
     
     
      //启动发送线程
      public void startSendThread(){
              new Thread(new Runnable(){

                    @Override
                    public void run() {
                            try {
                                  send();
                                 
                            } catch (Exception e) {
                                  // TODO Auto-generated catch block
                                  e.printStackTrace();
                            }
                           
                    }}).start();
             
      }
     
     
      //模拟发送消息
      public void send() throws Exception{
              System.out.println("发送端-发送数据线程启动...");
              int id=0;
              while(true){
                    id++;
                    byte[] msgData=(id+"-hello").getBytes();
                   
                    //创建要发送的消息对象
                    NetJavaMsg sendMsg=new NetJavaMsg(id,msgData);
                   
                   
                    //要发送的数据:将要发送的数据转为字节数组
                    byte[] buffer=sendMsg.toByte();
                   
                    //创建数据包,指定内容,指定目标地址
                    DatagramPacket dp=new DatagramPacket(buffer,buffer.length,destAddr);
                   
                    dSender.send(dp);  //发送
                   
                    sendMsg.setSendCount(1);
                    sendMsg.setLastSendTime(System.currentTimeMillis());
                    sendMsg.setRecvRespAdd(localAddr);
                    sendMsg.setDestAdd(destAddr);
                   
                    msgQueue.put(id, sendMsg);
                    System.out.println("客户端-数据已发送"+sendMsg);
                    Thread.sleep(1000);
                   
              }
      }
     
     
      //启动接收应答线程
      public void startRecvResponseThread(){
              new Thread(new Runnable(){

                    @Override
                    public void run() {
                            try{
                                  recvResponse();
                                 
                            }catch(Exception e){
                                  e.printStackTrace();
                            }
                           
                    }}).start();
      }
     
     
      //接收应答消息
      public void recvResponse() throws Exception{
              System.out.println("接收端-接收应答线程启动...");
              while(true){
                    byte[] recvData=new byte[100];
                   
                    //创建接收数据包对象
                    DatagramPacket recvPacket=new DatagramPacket(recvData,recvData.length);
                   
                    dSender.receive(recvPacket);
                    NetJavaRespMsg resp=new NetJavaRespMsg(recvPacket.getData());
                   
                    int respID=resp.getRepId();
                    NetJavaMsg msg=msgQueue.get(new Integer(respID));
                   
                    if(msg!=null){
                            System.out.println("接收端-已收到:"+msg);
                            msgQueue.remove(respID);
                    }
                   
              }
      }
     
     
     
      //启动重发线程
      public void startReSendThread(){
              new Thread(new Runnable(){

                    @Override
                    public void run() {
                            try{
                                  while(true){
                                          resendMsg();
                                          Thread.sleep(1000);
                                  }
                                 
                            }catch(Exception e){
                                  e.printStackTrace();
                            }
                           
                    }}).start();
      }
     
     
      //判断Map中的消息,如果超过3秒未收到应答,则重发
      public void resendMsg(){
              Set<Integer> keyset=msgQueue.keySet();
              Iterator<Integer> it=keyset.iterator();
              while(it.hasNext()){
                    Integer key=it.next();
                    NetJavaMsg msg=msgQueue.get(key);
                   
                    if(msg.getSendCount()>3){
                            it.remove();
                            System.out.println("***发送端---检测到丢失的消息"+msg);
                    }
                   
                    long cTime=System.currentTimeMillis();
                    if((cTime-msg.getLastSendTime())>3000&&msg.getSendCount()<3){
                            byte[] buffer=msg.toByte();
                            try{
                                  DatagramPacket dp=new DatagramPacket(buffer,buffer.length,msg.getDestAdd());
                                  dSender.send(dp);
                                  msg.setSendCount(msg.getSendCount()+1);
                                  System.out.println("客户端--重发消息:"+msg);
                                 
                            }catch(Exception e){
                                  e.printStackTrace();
                                 
                            }
                    }
              }
      }
     
     
      public static void main(String[] args) throws Exception{
              new DatagramSend();

      }

}









import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
public class DatagramRecive {
   
      private SocketAddress localAddr;
      private DatagramSocket dSender;

      public DatagramRecive() throws Exception{
              localAddr=new InetSocketAddress("172.16.1.151",14000);
              dSender=new DatagramSocket(localAddr);
             
              //启动接收线程
              startRecvThread();
      }

     
      public void startRecvThread(){
              new Thread(new Runnable(){

                    @Override
                    public void run() {
                            try{
                                  recvMsg();
                                 
                            }catch(Exception e){
                                  e.printStackTrace();
                                 
                            }
                           
                    }}).start();
      }

     
      public void recvMsg() throws Exception{
              System.out.println("接收线程启动");
              while(true){
                    byte[] recvData=new byte[100];
                    DatagramPacket recvPacket=new DatagramPacket(recvData,recvData.length);
                    dSender.receive(recvPacket);
                   
                    NetJavaMsg recvMsg=new NetJavaMsg(recvPacket.getData());
                   
                    NetJavaRespMsg resp=new NetJavaRespMsg(recvMsg.getId(),(byte)0,System.currentTimeMillis());
                   
                    byte[] data=resp.toByte();
                    DatagramPacket dp=new DatagramPacket(data,data.length,recvPacket.getSocketAddress());
                    dSender.send(dp);
                   
                    System.out.println("接收端-已发送应答"+resp);
              }
      }
     
     
      public static void main(String[] args) throws Exception{
              new DatagramRecive();

      }

}

    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约