UDP实现可靠传输的简单实例 import java.io.*;
public class NetJavaRespMsg {
private int totalLen; private int repId; //回复对应接收到的消息ID private byte state=0; //状态 0:正确接收 其它:错误 private long resTime; //应答方的发送时间
public NetJavaRespMsg(int repId,byte state,long resTime){ this.repId=repId; this.state=state; this.resTime=resTime; totalLen=4+4+1+8;
}
public NetJavaRespMsg(byte[] udpData){ try{ ByteArrayInputStream bins=new ByteArrayInputStream(udpData); DataInputStream dins=new DataInputStream(bins);
this.totalLen=dins.readInt(); this.repId=dins.readInt(); this.state=dins.readByte(); this.resTime=dins.readLong();
}catch(Exception e){ e.printStackTrace(); } }
public byte[] toByte(){ try{ ByteArrayOutputStream bous=new ByteArrayOutputStream(); DataOutputStream dous=new DataOutputStream(bous); dous.writeInt(this.totalLen); dous.writeInt(this.repId); dous.writeByte(this.state); dous.writeLong(this.resTime); dous.flush();
return bous.toByteArray();
}catch(Exception e){ e.printStackTrace(); } return null; }
@Override public String toString() { // TODO Auto-generated method stub return "totalLen:"+this.totalLen+" respID"+this.repId+" state"+this.state+" resTime"+resTime;
}
public int getTotalLen() { return totalLen; } public void setTotalLen(int totalLen) { this.totalLen = totalLen; } public int getRepId() { return repId; } public void setRepId(int repId) { this.repId = repId; } public byte getState() { return state; } public void setState(byte state) { this.state = state; } public long getResTime() { return resTime; } public void setResTime(long resTime) { this.resTime = resTime; }
} import java.net.DatagramPacket; import java.net.DatagramSocket; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.util.*; import java.util.concurrent.*; /** * 数据报发送 * 1.发送消息线程负责发送,发送后将消息放入容器中等待应答 * 2.接收线程接收应答,从容器中匹配后删除 * 3.重发线程负责重发,未收到应答的消息,发送3次后移除 * @author Administrator * */ public class DatagramSend { private SocketAddress localAddr; //本地要发送的地址对象 private DatagramSocket dSender; //发送的Socket对象 private SocketAddress destAddr; //目标地址 //本地缓存已发送的消息Map key为消息ID value为消息对象本身 Map<Integer,NetJavaMsg> msgQueue=new ConcurrentHashMap(); public DatagramSend() throws Exception{ localAddr=new InetSocketAddress("172.16.1.151",13000); dSender=new DatagramSocket(localAddr); destAddr=new InetSocketAddress("172.16.1.151",14000); //启动三个线程 startSendThread(); startRecvResponseThread(); startReSendThread(); } //启动发送线程 public void startSendThread(){ new Thread(new Runnable(){ @Override public void run() { try { send(); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } }}).start(); } //模拟发送消息 public void send() throws Exception{ System.out.println("发送端-发送数据线程启动..."); int id=0; while(true){ id++; byte[] msgData=(id+"-hello").getBytes(); //创建要发送的消息对象 NetJavaMsg sendMsg=new NetJavaMsg(id,msgData); //要发送的数据:将要发送的数据转为字节数组 byte[] buffer=sendMsg.toByte(); //创建数据包,指定内容,指定目标地址 DatagramPacket dp=new DatagramPacket(buffer,buffer.length,destAddr); dSender.send(dp); //发送 sendMsg.setSendCount(1); sendMsg.setLastSendTime(System.currentTimeMillis()); sendMsg.setRecvRespAdd(localAddr); sendMsg.setDestAdd(destAddr); msgQueue.put(id, sendMsg); System.out.println("客户端-数据已发送"+sendMsg); Thread.sleep(1000); } } //启动接收应答线程 public void startRecvResponseThread(){ new Thread(new Runnable(){ @Override public void run() { try{ recvResponse(); }catch(Exception e){ e.printStackTrace(); } }}).start(); } //接收应答消息 public void recvResponse() throws Exception{ System.out.println("接收端-接收应答线程启动..."); while(true){ byte[] recvData=new byte[100]; //创建接收数据包对象 DatagramPacket recvPacket=new DatagramPacket(recvData,recvData.length); dSender.receive(recvPacket); NetJavaRespMsg resp=new NetJavaRespMsg(recvPacket.getData()); int respID=resp.getRepId(); NetJavaMsg msg=msgQueue.get(new Integer(respID)); if(msg!=null){ System.out.println("接收端-已收到:"+msg); msgQueue.remove(respID); } } } //启动重发线程 public void startReSendThread(){ new Thread(new Runnable(){ @Override public void run() { try{ while(true){ resendMsg(); Thread.sleep(1000); } }catch(Exception e){ e.printStackTrace(); } }}).start(); } //判断Map中的消息,如果超过3秒未收到应答,则重发 public void resendMsg(){ Set<Integer> keyset=msgQueue.keySet(); Iterator<Integer> it=keyset.iterator(); while(it.hasNext()){ Integer key=it.next(); NetJavaMsg msg=msgQueue.get(key); if(msg.getSendCount()>3){ it.remove(); System.out.println("***发送端---检测到丢失的消息"+msg); } long cTime=System.currentTimeMillis(); if((cTime-msg.getLastSendTime())>3000&&msg.getSendCount()<3){ byte[] buffer=msg.toByte(); try{ DatagramPacket dp=new DatagramPacket(buffer,buffer.length,msg.getDestAdd()); dSender.send(dp); msg.setSendCount(msg.getSendCount()+1); System.out.println("客户端--重发消息:"+msg); }catch(Exception e){ e.printStackTrace(); } } } } public static void main(String[] args) throws Exception{ new DatagramSend(); } } import java.net.DatagramPacket; import java.net.DatagramSocket; import java.net.InetSocketAddress; import java.net.SocketAddress; public class DatagramRecive { private SocketAddress localAddr; private DatagramSocket dSender; public DatagramRecive() throws Exception{ localAddr=new InetSocketAddress("172.16.1.151",14000); dSender=new DatagramSocket(localAddr); //启动接收线程 startRecvThread(); } public void startRecvThread(){ new Thread(new Runnable(){ @Override public void run() { try{ recvMsg(); }catch(Exception e){ e.printStackTrace(); } }}).start(); } public void recvMsg() throws Exception{ System.out.println("接收线程启动"); while(true){ byte[] recvData=new byte[100]; DatagramPacket recvPacket=new DatagramPacket(recvData,recvData.length); dSender.receive(recvPacket); NetJavaMsg recvMsg=new NetJavaMsg(recvPacket.getData()); NetJavaRespMsg resp=new NetJavaRespMsg(recvMsg.getId(),(byte)0,System.currentTimeMillis()); byte[] data=resp.toByte(); DatagramPacket dp=new DatagramPacket(data,data.length,recvPacket.getSocketAddress()); dSender.send(dp); System.out.println("接收端-已发送应答"+resp); } } public static void main(String[] args) throws Exception{ new DatagramRecive(); } } |
|