1 简单的client和server通讯下面的程序很简单,可以自己copy下来跑一下,客户端和服务端可以简单的通信 Client端代码 public class Client { public static void main(String[] args) throws Exception { /** * 要指定ip和端口 * 也可以使用bind指定本地的端口,不然是随机分配的,然后再connect */ Socket socket = new Socket(InetAddress.getLocalHost(), 5678); //socket.bind(new InetSocketAddress(InetAddress.getLocalHost(), 5674)); //socket.connect(new InetSocketAddress(InetAddress.getLocalHost(), 5678)); BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream())); PrintWriter out = new PrintWriter(socket.getOutputStream()); BufferedReader wt = new BufferedReader(new InputStreamReader(System.in)); while (true) { System.out.println("input sth"); String str = wt.readLine(); out.println(str); out.flush(); if (str.equals("end")) { break; } System.out.println(in.readLine()); } socket.close(); } } Server端代码 public class MyServer { public static void main(String[] args) throws IOException { //new ServerSocket(port),相当于执行了bind,然后accept ServerSocket server = new ServerSocket(5678); Socket client = server.accept(); //相比较于nio的buffer形式,以前的io确实麻烦不少 //要获取到socket的InputStream和OutputStream BufferedReader in = new BufferedReader( new InputStreamReader(client.getInputStream())); PrintWriter out = new PrintWriter(client.getOutputStream()); while (true) { String str = in.readLine(); System.out.println(str); out.println("has receive...."+str); out.flush(); if (str.equals("end")) break; } client.close(); } } 可以看到下面的输出================================ client:input sth client:hello world server:has receive....hello world client:input sth client:end 这种模式,单线程,一个|Server一次只接收一个请求 2 server端阻塞式的接收多个请求上面的只是一个demo,服务端只能接收一次连接,下面改造一个,使用多线程,服务端不断的接收请求(客户端不变) public class MySimgleThreadServer { /** * 将accept放在while循环中, *可以實現和多client連接,但是缺點就是只有在一個client連接并处理完后, * 才能有新的client連接,不能多client同時連接 * @param args * @throws IOException */ public static void main(String[] args) throws IOException { ServerSocket server = new ServerSocket(5678); while (true) { Socket client = server.accept(); try{ BufferedReader in = new BufferedReader( new InputStreamReader(client.getInputStream())); PrintWriter out = new PrintWriter(client.getOutputStream()); while (true) { System.out.println(client.getLocalPort()); String str = in.readLine(); System.out.println(str); out.println("has receive...."+str); out.flush(); if (str.equals("end")) break; } client.close(); }catch(Exception e){ } } } } 这种模式,还是单线程,一个|Server可以处理多个请求,但是一次还是只能接收一个请求 3 使用多线程接收多个请求(bio)知道上面应用的缺点,我们来进一步的改造,ServerSocket在Accept之后,将连接放入另外的线程中,不阻塞Accept,这样就可以同时接受多个客户端的连接 /** * 多線程serversocket類,每次accept之後,馬上交到一個线程去处理 */ public class MyThreadServer extends Thread { private Socket client; public MyThreadServer(Socket c) { this.client = c; } public void run() { try { BufferedReader in = new BufferedReader( new InputStreamReader(client.getInputStream())); PrintWriter out = new PrintWriter(client.getOutputStream()); while (true) { System.out.println(client.getLocalPort()+","+client.getPort()+","+ client.getLocalAddress()+","+client.getRemoteSocketAddress()); String str = in.readLine(); System.out.println(str); out.println("has receive...."); out.flush(); if (str.equals("end")) break; } client.close(); } catch (IOException ex) { } finally { } } public static void main(String[] args) throws IOException { ServerSocket server = new ServerSocket(5678); while (true) { MyThreadServer mu = new MyThreadServer(server.accept()); mu.start(); } } } 这种方式阻塞在server.accept(),接到一个新的连接建立请求后,会马上放到一个新的线程去处理,在input.read处阻塞,线程调度器会充当select的工作(即发现哪个Socket有事件),缺点在于连接数太多的时候,线程也会增多,系统压力增长太快,特别是长连接的情况下或者客户端网络环境很差,每次不能全部发送数据,而是部分发送,这时候一个连接(线程)会被占用较长的时间 1.java线程机制本身占用的内存,在linux 64位系统上每个线程占1M内存 http://www.oracle.com/technetwork/java/hotspotfaq-138619.html#threads_oom 2.太多线程造成的上下文切换的开销问题 大部分网络通讯都有相似的结构,Read、Decode、Process、Encode reply、Send,区别在于业务逻辑,格式等,下图是对上面那种编程模式更系统的展示: 4 NIO上面的应用瓶颈,主要主要在于接和线程一一对应(内存开销,上下文切换开销),无法支持太多的连接(C10K问题),因此可以利用nio来改善,nio可以通过一个selector管理大量的连接,(不是每线程每连接) 下面的内容主要来自于:Scalable in java 4.1 Reactor先看一个最简单使用nio的selector的结构,使用非阻塞IO,Selector机制处理连接,请求来了之后再dispatch下去处理,类似一个响应(Reactor)模式。 public class SimpleReactor { protected Selector selector; public SimpleReactor(int port){ ServerSocketChannel server; try { server = ServerSocketChannel.open(); selector = Selector.open(); server.socket().bind(new InetSocketAddress(port)); server.configureBlocking(false); server.register(selector, SelectionKey.OP_ACCEPT); } catch (IOException e) { } } private void listen() { try { for (;;) { int count=selector.select(); if(count==0) return; Iterator<SelectionKey> iter = selector.selectedKeys().iterator(); while (iter.hasNext()) { SelectionKey key = iter.next(); iter.remove(); // 处理事件 //handleKey(key); } } } catch (IOException e) { e.printStackTrace(); } } public static void main(String[] args) { int port = 12345; SimpleReactor server = new SimpleReactor(port); System.out.println("Listernint on " + port); server.listen(); } } 4.2 Multi Thread Handler上面可以看到Reactor要尽可能快的响应Channel状态的变化,他只处理连接,read,write数据;不能去处理逻辑,应该交给新的线程去处理,所以又变成了下面这样: private static ExecutorService tpe=Executors.newFixedThreadPool(5); private void listen() { try { for (;;) { int count=selector.select(); if(count==0) return; Iterator<SelectionKey> iter = selector.selectedKeys().iterator(); while (iter.hasNext()) { SelectionKey key = iter.next(); iter.remove(); tpe.execute(new Runnable() { @Override public void run() { // 处理事件 //handleKey(key); } }); } } } catch (IOException e) { e.printStackTrace(); } } 这种模式下Reactor负责基本的io read和send操作,数据读完以后再交给ThreadPool去处理 4.3 Multi Reactor上面的处理方式,如果有大量的读写事件,Reactor还是会因为IO操作而负荷过大,所以还有一种是Reactor也分成多个,每个Reactor拥有自己的Selector、Thread资源 Selector[] selectors; // also create threads int next = 0; class Acceptor { // ... public synchronized void run() { ... Socket connection = if (connection != null) new Handler(selectors[next], connection); if (++next == selectors.length) next = 0; } } 所以是下面这种结构 protected Selector[] selector; public ReactorPool(int port){ int count=5; ServerSocketChannel server; try { server = ServerSocketChannel.open(); selector =new Selector[count]; for(int i=0;i<count;i++){ selector[i]=Selector.open(); } server.socket().bind(new InetSocketAddress(port)); server.configureBlocking(false); //server只负责接受请求 //接受以后新创建的连接,注册到其他selector server.register(selector[0], SelectionKey.OP_ACCEPT); } catch (IOException e) { } } 在nio框架中,使用多个Selector能够显著的提高性能: 随着并发数量的提高,传统nio框架采用一个Selector来支撑大量连接,管理和触发连接已经遇到瓶颈,在mina和grizzly中都开始使用多个mina,直接上总结: 1、在处理大量连接的情况下,多个Selector比单个Selector好 4.4 Leader/Followerhttp://www./michael/publications/lf.pdf 5 小结上面基本包括了,在处理IO发送和接收的时候一些基本模式,如果要做成一个框架,比如类似netty,mina, 首先要对收发过程中缓存的处理进行更多的精细管理,nio本身也还有很多坑需要处理 其次要抽象出更多的接口,让用户能够处理; |
|
来自: 碧海山城 > 《通道和NIO基础》