分享

Socket通道续3---io框架模型演化

 碧海山城 2013-01-07

简单的clientserver通讯

下面的程序很简单,可以自己copy下来跑一下,客户端和服务端可以简单的通信

Client端代码

public class Client {


   public static void main(String[] args) throws Exception {

/**

         * 要指定ip和端口

         * 也可以使用bind指定本地的端口,不然是随机分配的,然后再connect

         */

    Socket socket = new Socket(InetAddress.getLocalHost(), 5678);

    //socket.bind(new InetSocketAddress(InetAddress.getLocalHost(), 5674));

    //socket.connect(new InetSocketAddress(InetAddress.getLocalHost(), 5678));

    BufferedReader in = new BufferedReader(new 

InputStreamReader(socket.getInputStream()));

    PrintWriter out = new PrintWriter(socket.getOutputStream());

    BufferedReader wt = new BufferedReader(new InputStreamReader(System.in));

    while (true) {

            System.out.println("input sth");

            String str = wt.readLine();

            out.println(str);

            out.flush();

            if (str.equals("end")) {

                break;

            }

            System.out.println(in.readLine());

    }

    socket.close();

    }

}

Server端代码

public class MyServer {


    public static void main(String[] args) throws IOException {

        //new ServerSocket(port)相当于执行了bind然后accept

        ServerSocket server = new ServerSocket(5678);

        Socket client = server.accept();

        //相比较nio的buffer形式,以前的io确实麻烦不少

        //要获取到socket的InputStream和OutputStream

        BufferedReader in = new BufferedReader(

                new InputStreamReader(client.getInputStream()));

        PrintWriter out = new PrintWriter(client.getOutputStream());

        while (true) {

            String str = in.readLine();

            System.out.println(str);

            out.println("has receive...."+str);

            out.flush();

            if (str.equals("end"))

                break;

            }

            client.close();

        }

}

可以看到下面的输出================================

client:input sth

client:hello world

server:has receive....hello world

client:input sth

client:end

这种模式,单线程,一个|Server一次只接收一个请求


server端阻塞式的接收多个请求


上面的只是一个demo,服务端只能接收一次连接,下面改造一个,使用多线程,服务端不断的接收请求(客户端不变)


public class MySimgleThreadServer {

/**

 * accept放在while循环中,

      *可以實現和多client連接,但是缺點就是只有在一個client連接并处理完后,

 * 才能有新的client連接,不能多client同時連接

 * @param args

 * @throws IOException

 */

public static void main(String[] args) throws IOException {

    ServerSocket server = new ServerSocket(5678);

    while (true) {

        Socket client = server.accept();

        try{

                BufferedReader in = new BufferedReader(

new InputStreamReader(client.getInputStream()));

                PrintWriter out = new PrintWriter(client.getOutputStream());

                while (true) {

                    System.out.println(client.getLocalPort());

                    String str = in.readLine();

                    System.out.println(str);

                    out.println("has receive...."+str);

                    out.flush();

                    if (str.equals("end"))

                        break;

                }

                client.close();

        }catch(Exception e){

        }

    }

    }

}

这种模式,还是单线程,一个|Server可以处理多个请求,但是一次还是只能接收一个请求


使用多线程接收多个请求(bio)

知道上面应用的缺点,我们来进一步的改造,ServerSocketAccept之后,将连接放入另外的线程中,不阻塞Accept,这样就可以同时接受多个客户端的连接

/**

 * 多線程serversocket類,每次accept之後,馬上交到一個线程去处理

 */

public class MyThreadServer extends Thread {

    private Socket client;


    public MyThreadServer(Socket c) {

      this.client = c;

    }


    public void run() {

    try {

        BufferedReader in = new BufferedReader(

new InputStreamReader(client.getInputStream()));

        PrintWriter out = new PrintWriter(client.getOutputStream());

        while (true) {

            System.out.println(client.getLocalPort()+","+client.getPort()+","+

client.getLocalAddress()+","+client.getRemoteSocketAddress());

            String str = in.readLine();

            System.out.println(str);

            out.println("has receive....");

            out.flush();

            if (str.equals("end"))

                break;

            }

        client.close();

    catch (IOException ex) {

    finally {

    }

}


public static void main(String[] args) throws IOException {

    ServerSocket server = new ServerSocket(5678);

    while (true) {

        MyThreadServer mu = new MyThreadServer(server.accept());

        mu.start();

    }

 }

}

这种方式阻塞在server.accept(),接到一个新的连接建立请求后,会马上放到一个新的线程去处理在input.read处阻塞,线程调度器会充当select的工作(即发现哪个Socket有事件),缺点在于连接数太多的时候,线程也会增多,系统压力增长太快,特别是长连接的情况下或者客户端网络环境很差,每次不能全部发送数据,而是部分发送,这时候一个连接(线程)会被占用较长的时间


1.java线程机制本身占用的内存,在linux 64位系统上每个线程占1M内存

http://www.oracle.com/technetwork/java/hotspotfaq-138619.html#threads_oom

2.太多线程造成的上下文切换的开销问题


大部分网络通讯都有相似的结构,Read、DecodeProcessEncode replySend,区别在于业务逻辑,格式等,下图是对上面那种编程模式更系统的展示:

 
 

NIO

上面的应用瓶颈,主要主要在于接和线程一一对应(内存开销,上下文切换开销),无法支持太多的连接(C10K问题),因此可以利用nio来改善,nio可以通过一个selector管理大量的连接,(不是每线程每连接)

下面的内容主要来自于:Scalable in java

4.1 Reactor

先看一个最简单使用nioselector的结构,使用非阻塞IOSelector机制处理连接,请求来了之后再dispatch下去处理,类似一个响应(Reactor)模式。


public class SimpleReactor {


    protected Selector selector;

    public SimpleReactor(int port){

        ServerSocketChannel server;

        try {

            server = ServerSocketChannel.open();

            selector = Selector.open();

            server.socket().bind(new InetSocketAddress(port));

            server.configureBlocking(false);

            server.register(selector, SelectionKey.OP_ACCEPT);

        catch (IOException e) {

        }

    }


    private void listen() {

    try {

        for (;;) {

            int count=selector.select();

            if(count==0)

                return;

            Iterator<SelectionKey> iter = selector.selectedKeys().iterator();

            while (iter.hasNext()) {

                SelectionKey key = iter.next();

                iter.remove();

                // 处理事件

                //handleKey(key);

            }

        }

    catch (IOException e) {

        e.printStackTrace();

    }

}

    public static void main(String[] args) {

        int port = 12345;

        SimpleReactor server = new SimpleReactor(port);

        System.out.println("Listernint on " + port);

        server.listen();

}


}

4.2 Multi Thread Handler

上面可以看到Reactor要尽可能快的响应Channel状态的变化,他只处理连接,readwrite数据;不能去处理逻辑,应该交给新的线程去处理,所以又变成了下面这样:

private static ExecutorService tpe=Executors.newFixedThreadPool(5);


private void listen() {

    try {

        for (;;) {

            int count=selector.select();

            if(count==0)

                return;

            Iterator<SelectionKey> iter = selector.selectedKeys().iterator();

            while (iter.hasNext()) {

                SelectionKey key = iter.next();

                iter.remove();

                tpe.execute(new Runnable() {

                    @Override

                    public void run() {

                        // 处理事件

                        //handleKey(key);

                    }

            });

        }

    }

    catch (IOException e) {

        e.printStackTrace();

    }

}

这种模式下Reactor负责基本的io readsend操作,数据读完以后再交给ThreadPool去处理

4.3 Multi Reactor

上面的处理方式,如果有大量的读写事件,Reactor还是会因为IO操作而负荷过大,所以还有一种是Reactor也分成多个,每个Reactor拥有自己的SelectorThread资源

 

Selector[] selectors; // also create threads

int next = 0;

class Acceptor { // ... 

  public synchronized void run() { ...

Socket connection = 

    if (connection != null)

      new Handler(selectors[next], connection);

    if (++next == selectors.length) next = 0;

  }

}


所以是下面这种结构


protected Selector[] selector;


public ReactorPool(int port){

    int count=5;

    ServerSocketChannel server;

    try {

        server = ServerSocketChannel.open();

        selector =new Selector[count];


        for(int i=0;i<count;i++){

            selector[i]=Selector.open();

        }

        server.socket().bind(new InetSocketAddress(port));

        server.configureBlocking(false);


        //server只负责接受请求

        //接受以后新创建的连接,注册到其他selector

        server.register(selector[0], SelectionKey.OP_ACCEPT);

    catch (IOException e) {

    }

}

nio框架中,使用多个Selector能够显著的提高性能:

nio框架中的多个Selector结构

随着并发数量的提高,传统nio框架采用一个Selector来支撑大量连接,管理和触发连接已经遇到瓶颈,在minagrizzly中都开始使用多个mina,直接上总结:

1、在处理大量连接的情况下,多个Selector比单个Selector
2、多个Selector的情况下,处理OP_READOP_WRITESelector要与处理OP_ACCEPTSelector分离,也就是说处理接入应该要一个单独的Selector对象来处理,避免IO读写事件影响接入速度
3Selector的数目问题,mina默认是cpu+2,而grizzly总共就2个,我更倾向于mina的策略,但是我认为应该对cpu个数做一个判断,如果CPU个数超过8个,那么更多的Selector线程可能带来比较大的线程切换的开销mina默认的策略并非合适,幸好可以通过API设置这个数值。

4.4 Leader/Follower

http://www./michael/publications/lf.pdf

小结

上面基本包括了,在处理IO发送和接收的时候一些基本模式,如果要做成一个框架,比如类似nettymina

首先要对收发过程中缓存的处理进行更多的精细管理,nio本身也还有很多坑需要处理

其次要抽象出更多的接口,让用户能够处理;

    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约