配色: 字号:
Java 高并发八:NIO和AIO详解
2016-10-17 | 阅:  转:  |  分享 
  
Java高并发八:NIO和AIO详解

本文主要介绍Java高并发NIO和AIO的知识,这里整理了详细的资料,并详细介绍了1.什么是NIO2.Buffer3.Channel4.网络编程5.AIO的知识,有需要的小伙伴可以参考下

IO感觉上和多线程并没有多大关系,但是NIO改变了线程在应用层面使用的方式,也解决了一些实际的困难。而AIO是异步IO和前面的系列也有点关系。在此,为了学习和记录,也写一篇文章来介绍NIO和AIO。

1.什么是NIO

NIO是NewI/O的简称,与旧式的基于流的I/O方法相对,从名字看,它表示新的一套JavaI/O标准。它是在Java1.4中被纳入到JDK中的,并具有以下特性:

NIO是基于块(Block)的,它以块为基本单位处理数据(硬盘上存储的单位也是按Block来存储,这样性能上比基于流的方式要好一些)

为所有的原始类型提供(Buffer)缓存支持

增加通道(Channel)对象,作为新的原始I/O抽象

支持锁(我们在平时使用时经常能看到会出现一些.lock的文件,这说明有线程正在使用这把锁,当线程释放锁时,会把这个文件删除掉,这样其他线程才能继续拿到这把锁)和内存映射文件的文件访问接口

提供了基于Selector的异步网络I/O



所有的从通道中的读写操作,都要经过Buffer,而通道就是io的抽象,通道的另一端就是操纵的文件。

2.Buffer



Java中Buffer的实现。基本的数据类型都有它对应的Buffer

Buffer的简单使用例子:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18 packagetest;

importjava.io.File;

importjava.io.FileInputStream;

importjava.nio.ByteBuffer;

importjava.nio.channels.FileChannel;

publicclassTest{

publicstaticvoidmain(String[]args)throwsException{

FileInputStreamfin=newFileInputStream(newFile(

"d:\\temp_buffer.tmp"));

FileChannelfc=fin.getChannel();

ByteBufferbyteBuffer=ByteBuffer.allocate(1024);

fc.read(byteBuffer);

fc.close();

byteBuffer.flip();//读写转换

}

} 总结下使用的步骤是:

1.得到Channel

2.申请Buffer

3.建立Channel和Buffer的读/写关系

4.关闭

下面的例子是使用NIO来复制文件:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19 publicstaticvoidnioCopyFile(Stringresource,Stringdestination)

throwsIOException{

FileInputStreamfis=newFileInputStream(resource);

FileOutputStreamfos=newFileOutputStream(destination);

FileChannelreadChannel=fis.getChannel();//读文件通道

FileChannelwriteChannel=fos.getChannel();//写文件通道

ByteBufferbuffer=ByteBuffer.allocate(1024);//读入数据缓存

while(true){

buffer.clear();

intlen=readChannel.read(buffer);//读入数据

if(len==-1){

break;//读取完毕

}

buffer.flip();

writeChannel.write(buffer);//写入文件

}

readChannel.close();

writeChannel.close();

} Buffer中有3个重要的参数:位置(position)、容量(capactiy)和上限(limit)

这里要区别下容量和上限,比如一个Buffer有10KB,那么10KB就是容量,我将5KB的文件读到Buffer中,那么上限就是5KB。

下面举个例子来理解下这3个重要的参数:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24 publicstaticvoidmain(String[]args)throwsException{

ByteBufferb=ByteBuffer.allocate(15);//15个字节大小的缓冲区

System.out.println("limit="+b.limit()+"capacity="+b.capacity()

+"position="+b.position());

for(inti=0;i<10;i++){

//存入10个字节数据

b.put((byte)i);

}

System.out.println("limit="+b.limit()+"capacity="+b.capacity()

+"position="+b.position());

b.flip();//重置position

System.out.println("limit="+b.limit()+"capacity="+b.capacity()

+"position="+b.position());

for(inti=0;i<5;i++){

System.out.print(b.get());

}

System.out.println();

System.out.println("limit="+b.limit()+"capacity="+b.capacity()

+"position="+b.position());

b.flip();

System.out.println("limit="+b.limit()+"capacity="+b.capacity()

+"position="+b.position());

} 整个过程如图:



此时position从0到10,capactiy和limit不变。



该操作会重置position,通常,将buffer从写模式转换为读模式时需要执行此方法flip()操作不仅重置了当前的position为0,还将limit设置到当前position的位置。

limit的意义在于,来确定哪些数据是有意义的,换句话说,从position到limit之间的数据才是有意义的数据,因为是上次操作的数据。所以flip操作往往是读写转换的意思。



意义同上。

而Buffer中大多数的方法都是去改变这3个参数来达到某些功能的:

publicfinalBufferrewind()

将position置零,并清除标志位(mark)

publicfinalBufferclear()

将position置零,同时将limit设置为capacity的大小,并清除了标志mark

publicfinalBufferflip()

先将limit设置到position所在位置,然后将position置零,并清除标志位mark,通常在读写转换时使用

文件映射到内存

1

2

3

4

5

6

7

8

9

10

11

12 publicstaticvoidmain(String[]args)throwsException{

RandomAccessFileraf=newRandomAccessFile("C:\\mapfile.txt","rw");

FileChannelfc=raf.getChannel();

//将文件映射到内存中

MappedByteBuffermbb=fc.map(FileChannel.MapMode.READ_WRITE,0,

raf.length());

while(mbb.hasRemaining()){

System.out.print((char)mbb.get());

}

mbb.put(0,(byte)98);//修改文件

raf.close();

}

简单的多线程服务器:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19 publicstaticvoidmain(String[]args)throwsException{

ServerSocketechoServer=null;

SocketclientSocket=null;

try{

echoServer=newServerSocket(8000);

}catch(IOExceptione){

System.out.println(e);

}

while(true){

try{

clientSocket=echoServer.accept();

System.out.println(clientSocket.getwww.hunanwang.netRemoteSocketAddress()

+"connect!");

tp.execute(newHandleMsg(clientSocket));

}catch(IOExceptione){

System.out.println(e);

}

}

} 功能就是服务器端读到什么数据,就向客户端回写什么数据。

这里的tp是一个线程池,HandleMsg是处理消息的类。

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23 staticclassHandleMsgimplementsRunnable{

省略部分信息

publicvoidrun(){

try{

is=newBufferedReader(newInputStreamReader(clientSocket.getInputStream()));

os=newPrintWriter(clientSocket.getOutputStream(),true);

//从InputStream当中读取客户端所发送的数据

StringinputLine=null;

longb=System.currentTimeMillis();

while((inputLine=is.readLine())!=null)

{

os.println(inputLine);

}

longe=System.currentTimeMillis();

System.out.println("spend:"+(e-b)+"ms");

}catch(IOExceptione){

e.printStackTrace();

}finally

{

关闭资源

}

}

} 1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18 publicstaticvoidmain(String[]args)throwsException{

Socketclient=null;

PrintWriterwriter=null;

BufferedReaderreader=null;

try{

client=newSocket();

client.connect(newInetSocketAddress("localhost",8000));

writer=newPrintWriter(client.getOutputStream(),true);

writer.println("Hello!");

writer.flush();

reader=newBufferedReader(newInputStreamReader(

client.getInputStream()));

System.out.println("fromserver:"+reader.readLine());

}catch(Exceptione){

}finally{

//省略资源关闭

}

} 以上的网络编程是很基本的,使用这种方式,会有一些问题:

为每一个客户端使用一个线程,如果客户端出现延时等异常,线程可能会被占用很长时间。因为数据的准备和读取都在这个线程中。此时,如果客户端数量众多,可能会消耗大量的系统资源。

解决方案:

使用非阻塞的NIO(读取数据不等待,数据准备好了再工作)

为了体现NIO使用的高效。

这里先模拟一个低效的客户端来模拟因网络而延时的情况:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27 privatestaticExecutorServicetp=Executors.newCachedThreadPool();

privatestaticfinalintsleep_time=100010001000;

publicstaticclassEchoClientimplementsRunnable{

publicvoidrun(){

try{

client=newSocket();

client.connect(newInetSocketAddress("localhost",8000));

writer=newPrintWriter(client.getOutputStream(),true);

writer.print("H");

LockSupport.parkNanos(sleep_time);

writer.print("e");

LockSupport.parkNanos(sleep_time);

writer.print("l");

LockSupport.parkNanos(sleep_time);

writer.print("l");

LockSupport.parkNanos(sleep_time);

writer.print("o");

LockSupport.parkNanos(sleep_time);

writer.print("!");

LockSupport.parkNanos(sleep_time);

writer.println();

writer.flush();

}catch(Exceptione)

{

}

}

} 服务器端输出:

spend:6000msspend:6000msspend:6000msspend:6001msspend:6002msspend:6002msspend:6002msspend:6002msspend:6003msspend:6003ms

因为

while((inputLine=is.readLine())!=null)

是阻塞的,所以时间都花在等待中。

如果用NIO来处理这个问题会怎么做呢?

NIO有一个很大的特点就是:把数据准备好了再通知我

而Channel有点类似于流,一个Channel可以和文件或者网络Socket对应。



selector是一个选择器,它可以选择某一个Channel,然后做些事情。

一个线程可以对应一个selector,而一个selector可以轮询多个Channel,而每个Channel对应了一个Socket。

与上面一个线程对应一个Socket相比,使用NIO后,一个线程可以轮询多个Socket。

当selector调用select()时,会查看是否有客户端准备好了数据。当没有数据被准备好时,select()会阻塞。平时都说NIO是非阻塞的,但是如果没有数据被准备好还是会有阻塞现象。

当有数据被准备好时,调用完select()后,会返回一个SelectionKey,SelectionKey表示在某个selector上的某个Channel的数据已经被准备好了。

只有在数据准备好时,这个Channel才会被选择。

这样NIO实现了一个线程来监控多个客户端。

而刚刚模拟的网络延迟的客户端将不会影响NIO下的线程,因为某个Socket网络延迟时,数据还未被准备好,selector是不会选择它的,而会选择其他准备好的客户端。

selectNow()与select()的区别在于,selectNow()是不阻塞的,当没有客户端准备好数据时,selectNow()不会阻塞,将返回0,有客户端准备好数据时,selectNow()返回准备好的客户端的个数。

主要代码:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

145

146

147

148

149

150

151

152

153

154

155

156

157

158

159

160

161

162

163

164

165

166

167

168

169

170

171

172

173

174

175

176

177

178

179

180 packagetest;

importjava.net.InetAddress;

importjava.net.InetSocketAddress;

importjava.net.Socket;

importjava.nio.ByteBuffer;

importjava.nio.channels.SelectionKey;

importjava.nio.channels.Selector;

importjava.nio.channels.ServerSocketChannel;

importjava.nio.channels.SocketChannel;

importjava.nio.channels.spi.AbstractSelector;

importjava.nio.channels.spi.SelectorProvider;

importjava.util.HashMap;

importjava.util.Iterator;

importjava.util.LinkedList;

importjava.util.Map;

importjava.util.Set;

importjava.util.concurrent.ExecutorService;

importjava.util.concurrent.Executors;

publicclassMultiThreadNIOEchoServer{

publicstaticMapgeym_www.visa158.com_time_stat=newHashMap();

classEchoClient{

privateLinkedListoutq;

EchoClient(){

outq=newLinkedList();

}

publicLinkedListgetOutputQueue(){

returnoutq;

}

publicvoidenqueue(ByteBufferbb){

outq.addFirst(bb);

}

}

classHandleMsgimplementsRunnable{

SelectionKeysk;

ByteBufferbb;

publicHandleMsg(SelectionKeysk,ByteBufferbb){

super();

this.sk=sk;

this.bb=bb;

}

@Override

publicvoidrun(){

//TODOAuto-generatedmethodstub

EchoClientechoClient=(EchoClient)sk.attachment();

echoClient.enqueue(bb);

sk.interestOps(SelectionKey.OP_READ|SelectionKey.OP_WRITE);

selector.wakeup();

}

}

privateSelectorselector;

privateExecutorServicetp=Executors.newCachedThreadPool();

privatevoidstartServer()throwsException{

selector=SelectorProvider.provider().openSelector();

ServerSocketChannelssc=ServerSocketChannel.open();

ssc.configureBlocking(false);

InetSocketAddressisa=newInetSocketAddress(8000);

ssc.socket().bind(isa);

//注册感兴趣的事件,此处对accpet事件感兴趣

SelectionKeyacceptKey=ssc.register(selector,SelectionKey.OP_ACCEPT);

for(;;){

selector.select();

SetreadyKeys=selector.selectedKeys();

Iteratori=readyKeys.iterator();

longe=0;

while(i.hasNext()){

SelectionKeysk=(SelectionKey)i.next();

i.remove();

if(sk.isAcceptable()){

doAccept(sk);

}elseif(sk.isValid()&&sk.isReadable()){

if(!geym_time_stat.containsKey(((SocketChannel)sk

.channel()).socket())){

geym_time_stat.put(

((SocketChannel)sk.channel()).socket(),

System.currentTimeMillis());

}

doRead(sk);

}elseif(sk.isValid()&&sk.isWritable()){

doWrite(sk);

e=System.currentTimeMillis();

longb=geym_time_stat.remove(((SocketChannel)sk

.channel()).socket());

System.out.println("spend:"+(e-b)+"ms");

}

}

}

}

privatevoiddoWrite(SelectionKeysk){

//TODOAuto-generatedmethodstub

SocketChannelchannel=(SocketChannel)sk.channel();

EchoClientechoClient=(EchoClient)sk.attachment();

LinkedListoutq=echoClient.getOutputQueue();

ByteBufferbb=outq.getLast();

try{

intlen=channel.write(bb);

if(len==-1){

disconnect(sk);

return;

}

if(bb.remaining()==0){

outq.removeLast();

}

}catch(Exceptione){

//TODO:handleexception

disconnect(sk);

}

if(outq.size()==0){

sk.interestOps(SelectionKey.OP_READ);

}

}

privatevoiddoRead(SelectionKeysk){

//TODOAuto-generatedmethodstub

SocketChannelchannel=(SocketChannel)sk.channel();

ByteBufferbb=ByteBuffer.allocate(8192);

intlen;

try{

len=channel.read(bb);

if(len<0){

disconnect(sk);

return;

}

}catch(Exceptione){

//TODO:handleexception

disconnect(sk);

return;

}

bb.flip();

tp.execute(newHandleMsg(sk,bb));

}

privatevoiddisconnect(SelectionKeysk){

//TODOAuto-generatedmethodstub

//省略略干关闭操作

}

privatevoiddoAccept(SelectionKeysk){

//TODOAuto-generatedmethodstub

ServerSocketChannelserver=(ServerSocketChannel)sk.channel();

SocketChannelclientChannel;

try{

clientChannel=server.accept();

clientChannel.configureBlocking(false);

SelectionKeyclientKey=clientChannel.register(selector,

SelectionKey.OP_READ);

EchoClientechoClinet=newEchoClient();

clientKey.attach(echoClinet);

InetAddressclientAddress=clientChannel.socket().getInetAddress();

System.out.println("Acceptedconnectionfrom"

+clientAddress.getHostAddress());

}catch(Exceptione){

//TODO:handleexception

}

}

publicstaticvoidmain(String[]args){

//TODOAuto-generatedmethodstub

MultiThreadNIOEchoServerechoServer=newMultiThreadNIOEchoServer();

try{

echoServer.startServer();

}catch(Exceptione){

//TODO:handleexception

}

}

} 代码仅作参考,主要的特点是,对不同事件的感兴趣来做不同的事。

当用之前模拟的那个延迟的客户端时,这次的时间消耗就在2ms到11ms之间了。性能提升是很明显的。

总结:

1.NIO会将数据准备好后,再交由应用进行处理,数据的读取/写入过程依然在应用线程中完成,只是将等待的时间剥离到单独的线程中去。

2.节省数据准备时间(因为Selector可以复用)

5.AIO

AIO的特点:

1.读完了再通知我

2.不会加快IO,只是在读完后进行通知

3.使用回调函数,进行业务处理

AIO的相关代码:

AsynchronousServerSocketChannel

server=AsynchronousServerSocketChannel.open().bind(newInetSocketAddress(PORT));使用server上的accept方法

publicabstractvoidaccept(Aattachment,CompletionHandlerhandler);CompletionHandler为回调接口,当有客户端accept之后,就做handler中的事情。

示例代码:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33 server.accept(null,

newCompletionHandler(){

finalByteBufferbuffer=ByteBuffer.allocate(1024);

publicvoidcompleted(AsynchronousSocketChannelresult,

Objectattachment){

System.out.println(Thread.currentThread().getName());

FuturewriteResult=null;

try{

buffer.clear();

result.read(buffer).get(100,TimeUnit.SECONDS);

buffer.flip();

writeResult=result.write(buffer);

}catch(InterruptedException|ExecutionExceptione){

e.printStackTrace();

}catch(TimeoutExceptione){

e.printStackTrace();

}finally{

try{

server.accept(null,this);

writeResult.get();

result.close();

}catch(Exceptione){

System.out.println(e.toString());

}

}

}

@Override

publicvoidfailed(Throwableexc,Objectattachment){

System.out.println("failed:"+exc);

}

}); 这里使用了Future来实现即时返回,关于Future请参考上一篇

在理解了NIO的基础上,看AIO,区别在于AIO是等读写过程完成后再去调用回调函数。

NIO是同步非阻塞的

AIO是异步非阻塞的

由于NIO的读写过程依然在应用线程里完成,所以对于那些读写过程时间长的,NIO就不太适合。

而AIO的读写过程完成后才被通知,所以AIO能够胜任那些重量级,读写过程长的任务。























献花(0)
+1
(本文系白狐一梦首藏)