碧海山城 / 通道和NIO基础 / nio通道(4)--SocketChannel

分享

   

nio通道(4)--SocketChannel

2012-09-02  碧海山城

从之前的通道(1)--基础接口 大概知道了通道的基本特点,open/close/selectable/流式通道可以通过工厂方法open创建,文件通道比如在一个文件对象上调用getChannel来获取,下面来深入看下SocketChannel

 

这一篇比较偏向理论

如果想知道大概的代码怎么写,可以参考 SocketChannel续1---基本操作API 

如果想知道更多的坑,可以参考 SocketChannel续2---很多注意点

如果想知道一个nio框架的演变,可以参考SocketChannel续3--io框架模型演化

1      SocketIO

1.1      阻塞式的

之前的Socket/ServerSocket/DatagramSocket他们是阻塞式的,java IO的性能瓶颈所在

1.2      不便的读写操作

另外读写也相对不便,分别是getInputStream() getOutputStream() 然后使用他们的readwrite方法,或者用XXReader稍微包装一下,比如下面的

BufferedReader in = new BufferedReader(new InputStreamReader(client.getInputStream()));

PrintWriter out = new PrintWriter(client.getOutputStream());

while (true) {

        String str = in.readLine();//socket读入数据

        out.println("has receive...."+str);

        out.flush();

        if (str.equals("end"))

            break;

}

2           SocketChannel

通道是新增的IO服务管道,并且提供交互方法,但是socket已有的协议API并不会重新实现,大多可以复用

 

全部Socket通道类(DatagramChannel/SocketChannel/ServerSocketChannel)在创建的时候都会创建一个对等的Socket对象,可以通过socket()方法获取,这种方式获取到的socket可以调用getChannel()获取对应的channel

 

2.1      可选择的通道

从最基础的层面来看,选择器提供了询问通道是否准备好执行每个I/O操作的能力,比如了解一个SocketChannel对象是否有更多的字节需要读取,或者需要知道ServerSocketChannel是否有需要准备接受的连接。他们都实现了SelectableChannel接口,因此可以实现就绪选择,这种方式的价值在于潜在的大量的通道同时进行就绪状态的检查。

 

2.2      自己实现选择通道?

假设自己实现对是否就绪通道的轮训,那么从效率上来说会有几个问题1.检查每个通道都需要一次系统调用,代价昂贵;2.检查不是原子性的,列表中的每一个通道都有可能在它被检查之后就绪,直到下一次轮训为止;3.不断地遍历,无法在某个感兴趣的通道就绪时得到通知;

传统的监控多个socket的方案是为每一个socket创建一个线程并使得线程在read中阻塞,直到数据可用。这里的被阻塞线程被当做了socket监控器,java虚拟机的线程调度当做了通知机制,这种方式在线程数量的增长失控的时候会造成巨大的压力。

因此真正的就绪选择操作必须由操作系统来做,它会处理I/O请求并通知各个线程他们的数据已经准备好了。

 

2.3      SocketChananel建立

SocketSocketChannel类封装点对点、有序的网络连接;每个SocketChannel对象创建时都是和一个对等的Socket对象关联的,静态的open方法可以创建一个新的SocketChannel(注意Socket的通道都是通过工厂方法open创建的),在Channel上调用socket方法能返回对等的Socket对象

新创建的Channel都是未连接的,可以调用connect方法去连接,连接之前尝试IO操作会导致NotYetConnectedException异常。如果说阻塞模式下,线程在连接建立好或超时之前会保持阻塞;在非阻塞模式下(没有超时的参数),他会发起连接请求,并且立即返回,如果是true,则说明连接已经建立(本地环回连接);如果不能连接,立即返回false,并且异步的继续尝试连接,这时候isConnectPending()会返回true,这时候可以调用finishConnect()来安全的完成连接过程

1.connect还未被调用,抛出NoConnectionPendingException

2.正在进行连接,未完成,那么finishConnect会立即返回false

3.非阻塞模式下,调用connect之后,SocketChannel可以调用configureBlocking()切换回阻塞模式,这时候调用finishConnect方法会阻塞直到连接建立完成

4.如果连接已经建立,那么调用finishConnect()方法会返回true,什么也不发生

 

当通道处于中间的连接等待(connectio n-pending)状态时,只可以调用 finishConnect( ) isConnectPending( )isConnected( ) 方法。一旦连接建立过程成功完成,isConnected( ) 将返回 true值。

 

InetSocketAddress addr = new InetSocketAddress (host, port);
SocketChannel sc = SocketChannel.open( );
sc.configureBlocking (false);
sc.connect (addr);
while ( ! sc.finishConnect( )) {
     doSomethingElse( );
}
doSomethingWithChannel (sc);
sc.close( );

 

3           三个基本元素

 
(Selector、SelectionKey、SocketChannel之间的关系)
 
(在使用nio的情况下,常会接触的API)
 

3.1      Selector

它管理着一组被注册的通道集合的信息和他们的就绪状态

 
3.2     
SelectableChannel

这个抽象类提供了实现通道的可选择性所需要的公共方法。它是所有支持就绪检查的通道类的父类;一个通道可以被注册到多个选择器上,可以通过isRegistered来检查一个通道是否被注册到任何一个选择器上,但对每个选择器而言只能被注册一次。

(注册之前要确保通道是非阻塞的,否则抛出异常)

SelectionKey wKey = channel.register(selector, SelectionKey.OP_WRITE);

 

3.3      SelectorKey

选择键封装了特定的通道与特定的选择器的注册关系,可以调用cancel方法终结这种关系。在通道的特定事件注册到选择器上之后,该选择键对象被返回并提供一个表示这种注册关系的标记,通过它可以得到channel,包括一些状态等。

 

选择键是基于位的操作,比如判断是否有写事件,如下代码

public final boolean isWritable() {

        return (readyOps() & OP_WRITE) != 0;

}

4           深入

4.1      selector创建和注册

ServerSocketChannel server = ServerSocketChannel.open();

Selector sel = Selector.open();// 创建,使用完毕之后调用close关闭

 

server.socket().bind(new InetSocketAddress(port));

server.configureBlocking(false); // 设置成非阻塞

 

// 注册感兴趣的事件,可以关联一个对象,下次通过SelectorKey获取(给Server注册read事件其实是无效的)

SelectionKey sk=server.register(sel, SelectionKey.OP_ACCEPT);

SelectionKey sk1=server.register(sel, SelectionKey.OP_READ&SelectionKey.OP_ACCEPT);

       

//任意时刻只有一种注册关系是有效的,实际上只是更新selectionkey的感兴趣集合,并不是新创建

//keyFor返回channel和该Selector的选择键

Assert.assertTrue(server.keyFor(sel)==sk);

Assert.assertTrue(server.keyFor(sel)==sk1);

 

int count=selector.select(100);

 

Select是一个阻塞操作,他会等待直到有感兴趣的事件,或者超过100m

 

注意到Selector是通过open这个工厂方法创建的,他会通过SelectorProvider来获取一个新的实例,关于SPI机制,请参考:Java SPI机制简介

 

4.2   selector的选择键集合以及选择过程

前面我们知道了,选择键(SelectionKey)代表了channelSelector的注册关系,可以调用cancel()取消:

//取消channelSelector的注册关系

sk1.cancel();

不过这种注销关系并不是立即生效的,实际上,选择器(Selector)会维护三种选择键(SelectionKey)的集合

 

1.已注册的键的集合(Registered key set,可能包括已经取消的键,通过keys()方法返回,无法修改

2.已准备好的选择键集合(Selected key set1的子集,每个成员都是选择器判断相关的通道已经准备好,并且包含于键的interest集合中的某种操作(比如read),通过selectedKeys方法返回。如果要确定是具体某种操作,请使用readyOps确定。

3.已取消的键集合,但是还没被注销

 

当一个selector操作被调用的时候,会发生下面的事情:

1.检查已取消的键集合,如果非空,则将这键从另外两个集合中移出

 

2.检查已注册的键集合,每个键的interest事件集合会被检查,之后修改interest也不会影响后面的过程,然后会执行底层的查询,直到有感兴趣的事件或者超时

a)   在操作系统确定一个键的某个interest事件发生的时候,会确定这个键有没有在已选择的键集合中,如果没,则键加入到选择的集合,清空ready集合,并且设置该感兴趣的事件到ready

b)   如果已经在已选择的键中,那么更新ready(位操作)

c)   之后会重新执行1,这样可以取消那些在选择过程中有变更的通道

 

3.返回的是从上一个select之后,处于就绪状态的通道数量,如果已经在就绪集合中,不会累计;比如我们注册read,并且在使用之后SelectionKey不移出,那么下次再有read事件过来,select方法可能会返回0就不会被记录,但是不代表没有感兴趣的事件

 

延迟注销键的操作,是为了在选择的过程中减少不必要的同步,不然注销和选择就要形成一定的互斥,因为注销潜在的代价比较高,可能需要释放各种资源。关于SelectionKeyreadinterest集合可以参考4.4

4.3   唤醒正在阻塞的selector操作

调用wakeup可以使阻塞在select方法的线程返回,当然如果当前没有阻塞的select方法,那么会让下一次select直接返回,他调用多次和一次的效果是一样的

 

不过很多时候,我们并不能确定是否有线程阻塞在select方法,也不想影响到下一次select(只是因为某些事件,临时唤醒一下),那可以在wakeup之后调用selectNow,他会立即返回,也抵消了wakeup的影响

 

4.4   支持的事件

在使用Selector的时候,需要将一个通道注册到上面,即:(通道在注册之前需要设置为非阻塞)

Selector selector = Selector.open( );

server.register(channel, SelectionKey.OP_READ | SelectionKey.OP_WRITE);

2个参数表示关心的通道操作,有4种:读(read)、写(write)、连接(connect)、接受(accept

 

不是所有的Channel都支持这些事件,比如SocketChannel不支持accept,可以通过validOps()来验证特定的通道所支持的操作集合。

Socket channels support connecting, reading, and writing, so this method returns (SelectionKey.OP_CONNECT | SelectionKey.OP_READ | SelectionKey.OP_WRITE).

Server-socket channels only support the accepting of new connections, so this method returns SelectionKey.OP_ACCEPT.

 

4.5   SelectionKey使用

 

4.5.1          interestready集合

一个Key包含两个以整数形式进行编码的比特掩码,

interest(集合):指示哪些通道/选择器组合所关心的操作

ready(集合):表示通道准备好要执行的操作

 

当前的interest集合可以通过调用键对象的interestOps()方法来获取,这个值不会被选择器改变,但是可以调用带参数的interestOps()方法改变,和上面一样,他会在下一次selector的时候生效;

 

key.interestOps()返回的是注册到该channel上的感兴趣的动作,key.readyOps()是该channel已经就绪的操作,ready集合是interest集合的子集,并且表示了interest集合中从上次调用select()以来已经就绪的那些操作,例如:注册channel感兴趣的动作是OP_READ,OP_WRITE,

sc.register(sel, SelectionKey.OP_WRITE|SelectionKey.OP_READ);

调用interestOps()可以得到他对readwrite感兴趣,但是如果该channel中没有数据,则只能是key.readyOps()==SelectionKey.OP_WRITE



比如下面的方法检查通道是否已经可读,并且读取

if ((key.readyOps( ) & SelectionKey.OP_READ) != 0){    

            myBuffer.clear( );    

            key.channel( ).read (myBuffer);    

            doSomethingWithBuffer (myBuffer.flip( ));

 

 

另外key也有便捷的方式可以检查:

if (key.isWritable( ))
等价于:
if ((key.readyOps( ) & SelectionKey.OP_WRITE) != 0)

 

另外,SelectionKey中还有一个attach方法啊,可以获得键对象中保存所提供的对象的引用

 

4.5.2  移除SelectionKey


为了表示已经处理了ready,只能将该SelectionKey从已选择的键集合中移出

Iterator<SelectionKey> iter = selector.selectedKeys().iterator();

while (iter.hasNext()) {

  /**

SelectionKey中的ready集合表示了他感兴趣的事件,他只能在选择的时候由底层修改,其他时候无  

 法修改,他表示的是合法的就绪信息(而不是可以任意设置的)

 所以为了表示已经处理了ready,只能将该SelectionKey从已选择的键集合中移出


另外,如果通道关闭,因为SelectionKey表示通道和selector的关系,

所以永远都会发生“关闭”事件,除非通道从slector移除(key.cancel())

*/

    SelectionKey key = iter.next();

    iter.remove();

    handleKey(key);

}


// 处理事件,Key可以同时表示多个事件到达

protected void handleKey(SelectionKey key) throws IOException {

    if (key.isAcceptable()) { 

        // 允许网络连接事件

        ServerSocketChannel server = (ServerSocketChannel) key.channel();

        SocketChannel channel = server.accept();

        channel.configureBlocking(false);

        // 网络管道准备处理读事件

        channel.register(selector, SelectionKey.OP_READ);

    else if (key.isReadable()) { 

    else if (key.isWritable()) {

        SocketChannel channel = (SocketChannel) key.channel();

    }

}


 

    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多
    喜欢该文的人也喜欢 更多

    ×
    ×

    ¥.00

    微信或支付宝扫码支付:

    开通即同意《个图VIP服务协议》

    全部>>