分享

java的IO模型

 贪挽懒月 2022-06-20 发布于广东
写在前面

本文主要是重新梳理了Java的IO模型,基于之前NIO的文章进行补充,为学习Netty做准备。

IO模型

1、什么是IO模型:
简单地说,就是用什么样的通道进行数据的发送和接收。比如通道是阻塞的还是非阻塞的,是同步还是异步的。

2、Java支持的IO模型:
java支持的IO模型有:

  • BIO:就是JDK原生的IO,同步并且是阻塞的。在用BIO进行网络通信时,服务端的实现模式为一个连接一个线程,即客户端有连接请求时服务端就要启动一个线程进行处理,如果这个连接不做任何事情会造成不必要的线程开销。适用场景:连接数比较小且固定的架构,程序简单易于理解。

BIO模型示意图
  • NIO:同步非阻塞的IO,服务端的实现模式为一个线程处理多个请求,即客户端发送的连接请求都会注册到多路复用器上,多路复用器轮询到连接有IO请求就进行处理,下图中的selector就是这个多路复用器。当然server也可以启动多个线程,一个线程维护一个selector,一个selector维护多个client。Netty就是基于NIO。适用场景:连接数多且连接比较短,比如聊天服务器、弹幕系统、服务器之间的通讯等,编程比较复杂。

NIO模型示意图
  • AIO:异步非阻塞的IO,JDK1.7开始出现的,目前还没得到广泛的应用。适用场景:连接数多且连接比较长的重架构,编程比较复杂

BIO

二、BIO

1、BIO编程流程:

  • 服务器端启动一个serverSocket;

  • 客户端启动一个socket对服务端进行通信,默认情况下服务端需要对每个客户端连接建立一个线程与之通信;

  • 客户端发出连接请求后,先咨询服务端是否有线程响应,如果没有则会等待或者遭到拒绝;如果有线程响应,客户端线程会等待请求结束后继续执行;

2、BIO的应用实例:

  • 需求:使用BIO编写一个服务端,监听6666端口,当有客户端连接时,就启动一个线程与之通信。使用线程池机制进行改善,使其可以连接多个客户端。服务端可以接收客户端发送的数据。

  • 代码:

public class BioServer { 
    public static void main(String[] args) throws IOException {
        // 1. 创建线程池
        ExecutorService executorService = Executors.newCachedThreadPool();
        // 2. 创建serverSocket并监听端口
        ServerSocket serverSocket = new ServerSocket(6666);
        System.out.println("服务端已启动");
        // 3. 等待客户端连接
        System.out.println("等待连接……");
        while (true) {
            final Socket socket = serverSocket.accept();
            System.out.println("客户端连接进来了");
            // 4. 创建一个线程与之通信
            executorService.execute(new Runnable() {
                @Override
                public void run(
{
                    handler(socket);
                }
            });
        }
    }

    public static void handler(Socket socket{
        try {
            byte[] bys = new byte[1024];
            // 通过socket获取输入流
            InputStream inputstream = socket.getInputStream();
            // 循环读取客户端发送的数据
            while (true) {
                System.out.println("线程id:" + Thread.currentThread().getId() + ",线程名:" + Thread.currentThread().getName());
                System.out.println("reading……");
                int read = inputstream.read(bys);
                // 不等于负一表示还没读取完
                if (read != -1) {
                    System.out.println(new String(bys, 0, read));
                } else {
                    break;
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // 关闭socket连接
            try { socket.close(); } catch (IOException e) { e.printStackTrace(); }
        }
    }
}

这段代码就是按照需求编写了一个服务端。启动这个类,然后在CMD窗口输入telnet 127.0.0.1 6666回车,然后按ctrl + ],就进入了telnet,输入send hello,服务端控制台就会打印出hello以及线程信息。可以发现,当我们启动服务端后,控制台会打印出等待连接……,然后就卡在这里不动了,当我们通过telnet连接后,会打印出reading……,并且卡在那里,说明这是阻塞的。我们启动两个telnet去连接,通过控制台打印的线程id可以发现,处理这两个客户端连接的是两个线程,这与之前的模型分析一致

NIO

三、NIO

1、NIO三大核心部分:

  • Channel:通道,可以理解为是铁路;

  • Buffer:缓冲区,可以理解为火车;程序不是直接通过channel读写数据,而是通过buffer。这很好理解,火车装着货物跑到铁路上,对应了buffer装着数据跑在channel上;

  • Selector:选择器,就是上面NIO模型图中的selector,selector发现这个通道有内容要读取,就处理这个通道,如果这个通道没啥事儿,它不会阻塞在这里等这个通道,而是去看别的通道有没有内容要读取,如果都没有,管理selector的这个线程还可以去做别的事。

selector、buffer、channel之间的关系:

  • 每个channel都会对应一个buffer;一个channel可以理解为就是一个连接;

  • 一个selector对应一个线程;一个selector对应多个channel;

  • 程序切换到哪个channel是由事件决定;

  • selector会根据不同的事件,在各通道上切换;

  • buffer底层是一个数组;

  • 数据的读取和写入是通过buffer来完成的;BIO的读取和写入是通过输入输出流,不能双向,而buffer是双向的;

2、buffer:
buffer有四个重要的属性:

  • capacity:容量,该buffer能够容纳的最大数据量,缓冲区创建时被设定且不能修改;

  • limit:缓冲区当前的终点,不能对缓冲区超出终点的位置进行读写,limit是可变的;

  • position:下一个要被读或写的元素的索引,每次读写都会改变该值;

  • mark:标记;

buffer属性

读取数据的时候可以设置position和limit,表示从哪儿开始读,读到哪儿结束。

3、channel:
channel类似BIO的流,但是有些区别,如下:

  • 通过buffer,可以同时进行读写,而流只能读或者写;

  • 通道可以实现异步读写数据;

  • 通道可以从缓冲区读数据,也可以写数据到缓冲区;

channel是一个接口,用得比较多的实现有如下几个:

  • FileChannel:对文件进行操作的;

  • DatagramChannel:通过 UDP 读写网络中的数据通道;

  • ServerSocketChannel:类似ServerSocket;

  • SocketChannel:类似Socket

---

看几个实操案例:

  • 通过ByteBuffer和FileChannel将“带你去爬山”这句话写入到test01.txt中:

public class NioFileChannel01 {

    public static void main(String[] args) throws IOException {
        String str = "带你去爬山啊";
        FileOutputStream fos = new FileOutputStream("C:\\Users\\14751\\Desktop\\test01.txt");
        // 1. 通过FileOutputStream获取对应的FileChannel
        FileChannel fc = fos.getChannel();
        // 2. 创建缓冲区
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        // 3. 将str放入buffer中
        buffer.put(str.getBytes());
        // 4. 切换写数据模式
        buffer.flip();
        // 5. 将buffer数据写入到通道
        fc.write(buffer);
        // 6. 关闭资源
        fos.close();
        fc.close();
    }
}
  • 通过ByteBuffer和FileChanneltest01.txt文件中的内容:

public class NioFileChannel02 {

    public static void main(String[] args) throws IOException {
        // 1. 读取test01.txt文件
        File file = new File("C:\\Users\\14751\\Desktop\\test01.txt");
        // 2. 将file转成FileInputStream
        FileInputStream fis = new FileInputStream(file);
        // 3. 获取通道
        FileChannel channel = fis.getChannel();
        // 4. 创建缓冲区
        ByteBuffer buffer = ByteBuffer.allocate((int)file.length());
        // 5. 将通道数据读到buffer中
        channel.read(buffer);
        System.out.println(new String(buffer.array()));
        // 6. 关闭资源
        fis.close();
        channel.close();
    }
}
  • 使用FileChannel的read和write方法完成对文件的拷贝:

public class NioFileChannel03 {

    public static void main(String[] args) throws IOException {
        // 1. 读取源文件
        FileInputStream fis = new FileInputStream("C:\\Users\\14751\\Desktop\\test01.txt");
        // 2. 获取通道
        FileChannel sourceChannel = fis.getChannel();

        // 3. 加载目标文件
        FileOutputStream fos = new FileOutputStream("C:\\Users\\14751\\Desktop\\test02.txt");
        // 4. 获取通道
        FileChannel targetChannel = fos.getChannel();

        // 5. 创建缓冲区
        ByteBuffer buffer = ByteBuffer.allocate(1024);

        while (true) {
            // 6. 标志位复位,一定不能漏了这步,否则死循环
            buffer.clear();
            // 7. 读取数据
            int read = sourceChannel.read(buffer);
            if (read == -1) {
                break;
            }
            // 8. 切换到写数据模式,并将buffer中的数据写入到targetChannel
            buffer.flip();
            targetChannel.write(buffer);
        }
        // 9. 关闭资源
        fis.close();
        sourceChannel.close();
        fos.close();
        targetChannel.close();
    }
}
  • 使用transferFrom完成对文件的拷贝:

public class NioFileChannel04 {

    public static void main(String[] args) throws IOException {
        // 1. 读取源文件
        FileInputStream fis = new FileInputStream("C:\\Users\\14751\\Desktop\\test01.txt");
        // 2. 获取通道
        FileChannel sourceChannel = fis.getChannel();

        // 3. 加载目标文件
        FileOutputStream fos = new FileOutputStream("C:\\Users\\14751\\Desktop\\test03.txt");
        // 4. 获取通道
        FileChannel targetChannel = fos.getChannel();

        // 5. 使用transferFrom完成拷贝
        targetChannel.transferFrom(sourceChannel, 0, sourceChannel.size());

        // 6. 关闭资源
        fis.close();
        sourceChannel.close();
        fos.close();
        targetChannel.close();
    }
}
  • MappedByteBuffer:可以让文件直接在内存中修改,操作系统不需要拷贝,用法如下:

public class NioFileChannel05 {

    public static void main(String[] args) throws IOException {
        // 1. 加载文件
        RandomAccessFile file = new RandomAccessFile("C:\\Users\\14751\\Desktop\\test01.txt""rw"); // rw表示读写
        // 2. 获取文件通道
        FileChannel channel = file.getChannel();
        // 3. 获取MappedByteBuffer,这三个参数,第一个表示读写模式,第二个表示直接修改的起始位置,第三个表示映射到内存中的大小
        MappedByteBuffer buffer = channel.map(FileChannel.MapMode.READ_WRITE, 05);
        // 4. 对test01.txt进行修改
        buffer.put(0, (byte)'A'); // 第一个字符改成A
        buffer.put(1, (byte)'B'); // 第二个字符改成B
        // 5. 关闭资源
        file.close();
        channel.close();
    }
}
  • 分散和聚集:上面的案例,都是通过一个buffer来完成的,如果数据比较多,读取数据的时候,可以将数据读取到多个buffer中,同样写数据的时候,可以将多个buffer的数据写入到channel中,案例如下:

/**
 * scattering:将数据写入到buffer时,可以采用buffer数组,依次写入
 * gathering:从buffer读数据的时候,可以采用buffer数组,依次读取
 * @author zhu
 *
 */

public class NioFileChannel06 {

    public static void main(String[] args) throws IOException {
        // 1. 创建channel
        ServerSocketChannel serverChannel  = ServerSocketChannel.open();
        // 2. 绑定端口并启动
        InetSocketAddress address = new InetSocketAddress(6666);
        serverChannel.socket().bind(address);
        // 3. 创建buffer数组
        ByteBuffer[] buffers = new ByteBuffer[2];
        buffers[0] = ByteBuffer.allocate(5);
        buffers[1] = ByteBuffer.allocate(4);
        // 4. 等待客户端连接
        SocketChannel channel = serverChannel.accept();
        // 5. 循环读取
        // 假设客户端会发送8个字节
        int len  = 8;
        while (true) {
            int read = 0;
            while (read < len) {
                long byteNum = channel.read(buffers);
                read += byteNum;
                System.out.println("读取到的字节数:" + read);
            }
            // 6. 切换模式
            Arrays.asList(buffers).forEach(buffer -> buffer.flip());
            // 7. 将读取到的数据显示到客户端
            long writeLen = 0;
            while (writeLen < len) {
                long byteNum = channel.write(buffers);
                writeLen += byteNum;
            }
            // 8. 将所有buffer进行clear
            Arrays.asList(buffers).forEach(buffer -> buffer.clear());
        }
    }
}

4、selector:
selector能够检测多个通道是否有事件要发生,多个channel以事件的方式可以注册到同一个selector中。主要工作流程如下:

  • 当客户端连接时,会通过severSocket channel得到对应的socketChannel,并且将socketChannel通过register方法注册到selector中,注册后返回一个selectionKey;

  • selector通过集合关联这个selectionKey;

  • selector通过select方法进行监听(select方法是阻塞的,也可以传入超时时间,阻塞指定的时间,还可以用selectNow方法,这个就是非阻塞的;NIO的非阻塞也就体现在这里),返回有事件发生的通道的个数;

  • selector可以得到有事件发生的通道的selectionKey;

  • 通过selectionKey,就可以得到它对应的通道,然后就可以完成业务操作了。

---

看一个实操案例:用NIO实现服务端和客户端的通讯:

  • 服务端:

public class NIOServer {

    public static void main(String[] args) throws IOException {
        // 1. 创建NIOServerSocketChannel
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();

        // 2. 得到Selector对象
        Selector selector = Selector.open();

        // 3. 绑定端口,进行监听
        serverSocketChannel.socket().bind(new InetSocketAddress(6666));

        // 4. 设置为非阻塞
        serverSocketChannel.configureBlocking(false);

        // 5. 把serverSocketChannel注册到selector中,设置关心事件为 OP_ACCEPT
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

        // 6. 循环等待客户端连接
        while (true) {
            if (selector.select(1000) == 0) { // 没有事件
                System.out.println("服务器等待了1秒钟,没有事件发生");
                continue;
            } else { // 有事件
                // 7. 有事件发生,就拿到selectionKey的集合
                Set<SelectionKey> selectionKeys = selector.selectedKeys();

                // 8. 通过selectionKeys得到channel
                Iterator<SelectionKey> keyIterator = selectionKeys.iterator();
                while (keyIterator.hasNext()) {
                    SelectionKey key = keyIterator.next();

                    // 9. 根据key的不同事件,做对应的处理
                    if (key.isAcceptable()) { // 如果是OP_ACCEPT连接事件
                        // 10. 为该客户端生成一个SocketChannel并设置成非阻塞
                        SocketChannel socketChannel = serverSocketChannel.accept();
                        socketChannel.configureBlocking(false);
                        // 11. 将当前socketChannel也注册到selector中,关注事件为OP_READ,并且关联一个Buffer
                        socketChannel.register(selector, SelectionKey.OP_READ, ByteBuffer.allocate(1024));

                    }
                    if (key.isReadable()) { // 如果是OP_READ读取事件
                        // 12. 通过key得到channel
                        SocketChannel channel = (SocketChannel) key.channel();
                        // 13. 获取到该channel关联的buffer
                        ByteBuffer buffer = (ByteBuffer) key.attachment();
                        // 14. 将channel中的数据读到buffer中去
                        channel.read(buffer);
                        System.out.println("客户端发送的数据:" + new String(buffer.array()));
                    }
                    // 15. 移除当前的selectionKey,防止重复操作
                    keyIterator.remove();
                }
            }
        }
    }
}
  • 客户端:

public class NIOClient {

    public static void main(String[] args) throws IOException {
        // 1. 设置ip和端口
        InetSocketAddress address = new InetSocketAddress("127.0.0.1"6666);

        // 2. 创建SocketChannel并设置成非阻塞
        SocketChannel socketChannel = SocketChannel.open(address);
        socketChannel.configureBlocking(false);

        // 3. 连接服务器
        String str = "hello world";
        ByteBuffer buffer = ByteBuffer.wrap(str.getBytes());

        // 4. 将数据写入channel
        socketChannel.write(buffer);
        System.in.read();
    }

    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多