在应用程序中,为了捕获信号(还捕获呢, 不就是一个处理吗)可以使用signal()函数来设置对应的信号的处理函数。
在异步非阻塞IO中,我们是可以同时发起多个传输操作。这需要每个操作都有一个唯一的上下文,这样才能在它们完成时区分到底是哪个传输操作完成了。在AIO中,
通过aiocb(AIO IO control Block)结构体进行区分,这个结构体如下: struct aiocb { int aio_fildes; /* File descriptor */ off_t aio_offset; /* File offset */ volatile void * aio_buf; /* Location of buffer */ size_t aio_nbytes; /* Length of transfer */ int aio_reqprio; /* Request priority offset */ struct sigevent aio_sigevent; /* Signal number and value */ int aio_lio_opcode; /* Operation to be performed */ }; 从上边我们可以看到,这个结构体包含了有关传输的所有信息,包括数据准备的用户缓冲区。在产生IO通知时,aiocb结构就被用来唯一标识所完成的IO操作。 AIO系列API中主要有下边几个函数: 1.int aio_read(struct aiocb *aiocbp) 该函数请求对一个有效的文件描述符进行异步读操作。在请求进行排队之后会立即返回,如果执行成功,返回值就为0,错误则返回-1并设置errno的值。 2.int aio_write(struct aiocb *aiocbp) 3.int aio_error(struct aiocb *aiocbp) 该函数用来确定请求的状态,可以返回EINPROGRESS(说明请求尚未完成),ECANCELLED(请求被应用程序取消了),-1(说明发生了错误,具体错误原因由error记录)。 4.ssize_t aio_return(struct aiocb *aiocbp) 由于并没有阻塞在read调用上,所以我们不能立即返回这个函数的返回状态,这是就要使用这个函数了,需要注意的是只有在aio_error调用确定请求已经完成(可能 已经完成,也可能发生了错误)之后,才能调用这个函数,这个函数的返回值就相当于同步情况下read或write系统调用的返回值(所传输的字节数,如果发生错误,则返回-1)。 5.int aio_suspend(const struct aiocb *const cblist[], int n ,const struct timespec *timeout) 用户可以通过这个函数来来挂起(或阻塞)调用进程,直到异步请求完成为止,此时会产生一个信号,或者发生其他超时操作。调用者提供了一个aiocb引用列表,其中任何一个完成都会导致给函数返回。 6.int aio_cancel(int fd ,struct aiocb *aiocbp) 该函数允许用户取消对某个文件描述符执行的一个或所有的IO请求。 如果要取消一个请求,用户需提供文件描述符和aiocb引用,如果这个请求被成功取消了,则返回AIO_CANCELED,如果该请求完成了,返回AIO_NOTCANCELED. 如果要取消对某个给定文件描述符的所有请求,用户需要提供这个文件的描述符以及一个aiocbp的NULL引用,如果所有请求被成功取消了,则返回AIO_CANCELED ,只要至少有一个没被取消,这个函数就返回AIO_NOT_CANCELED.如果没有一个请求可以被取消,该函数就会返回AIO_ALLDONE. 然后,可以使用aio_error来验证每个AIO请求,如果某个请求已经被返回了,那么aio_error就返回-1,并且error会被设置为ECANCELED. 7.int lio_listio(int mode ,struct aiocb *list[], int nent ,struct sigevent *sig) 这个操作使得用户可以在一个系统调用(一次内核上下文切换中启动大量的I/O操作)。其中,mode参数可以是LIO_WAIT或LIO_NOWAIT,前者会阻塞这个调用,直到所有的IO都完成为止,在操作进行排队之后,LIO_NOWAIT就会返回,list是一个aiocb引用的列表,最大元素的个数有nent定义的。如果list的元素为NULL,lio_lis tio()将被忽略。 光说理论也不行,是不?现在来点实际点的: a)用户空间读例程: #include <aio.h> .. int fd, set; struct aiocb my_aiocb; fd = open("file.txt", O_RDONLY); if( fd <0 ) { perror("open"); } //清零aiocb结构体 bzero((char *) &my_aiocb, sizeof(struct aiocb)); //为aiocb请求分配数据缓冲区 my_aiocb.aio_buf = malloc(BUFSIZE + 1); if(!my_aiocb.aio_buf) perror("malloc"); //初始化aiocb的成员 my_aiocb.aio_fildes = fd; my_aiocb.aio_nbytes = BUFSIZE; my_aiocb.aio_offset = 0; ret = aio_read(&my_aiocb); if(ret < 0) perror("aio_read"); while(aio_error(&my_aiocb) == EINPROGRESS) ; if((ret = aio_return(&my_iocb))) { // 获得异步读的返回值 } else { 读失败,分析errror } b)用户空间异步IO aio_suspend()函数使用例程 struct aioct *cblist(MAX_LIST) //清零aioct结构链表 bzero((char *)cblist, sizeof(cblist)); //将一个或更多的aiocb放入aioct结构体链表 cblist[0] = &my_aiocb; ret = aio_read( &my_aiocb); ret = aio_suspend( cblist, MAX_LIST, NULL); c)用户空间异步IO lio_list()函数使用例程 struct aiocb aiocb1,aiocb2; struct aiocb *list[MAX_LIST]; ... //准备第一个aiocb aiocb1.aio_fildes = fd; aiocb1.aio_buf = malloc(BUFSIZE +1); aiocb1.aio_nbytes = BUFSIZE; aiocb1.aio_offset = next_offset; aiocb1.aio_lio_opcode = LIO_READ;//异步读操作 ...//准备多个aiocb bzero((char *)list, sizeof(list)); //将aiocb填入链表 list[0] = &aiocb1; list[1] = &aiocb2; ... ret = lio_listio(LIO_WAIT, list, MAX_LIST, NULL); //发起大量IO操作 先上代码:使用信号作为AIO异步IO通知机制
void setup_io(..) { int fd; struct sigaction sig_act; struct aiocb my_aiocb; ... //设置信号处理函数 sigemptyset(&sig_act.sa_mask); sig_act.sa_flags = SA_SIGINFO; sig_act.sa_sigaction = aio_completion_handler; //设置AIO请求 bzero((char *)&my_aiocb, sizeof(struct aiocb)); my_aiocb.aio_flags = fd; my_aiocb.aio_buf = malloc(BUF_SIZE + 1); my_aiocb.aio_nbytes = BUF_SIZE; my_aiocb.offset = next_offset; //连接AIO请求和信号处理函数 my_aiocb.aio_sigevent.sigev_notify = SIGEV_SIGNVAL; my_aiocb.aio_sigevent.sigev_signo = SIGIO; my_aiocb.aic_sigevent.sigev_value.sival_ptr = &my_aiocb; //将信号和处理函数绑定 ret = sigaction(SIGION, &sig_act, NULL); ... ret = aio_read(&my_aiocb); } //信号处理函数 void aio_completion_handler(int signo, siginfo_t *info, void *context) { struct aiocb *req; //确定是我们需要的信号 if(info->si_signo == SIGIO) { req = (struct aiocb *)info->si_value.sival_ptr; //获得aiocb; //请求的操作是否完成 if(aio_error(req) ==0 ) { ret = aio_return(req); } } return ; } 从上边可以看到,使用AIO的应用程序同样需要定义信号处理函数,在指定的信号被产生时会触发调用这个处理程序。
“那么是不是就只能使用信号这种方式呢,我记得以前没一个知识点你都给我讲了好多方法,这个歌也不例外吧”小王说。 “嗯,真聪明,就喜欢聪明的女生”听到小王也懂得开动脑子了,我也要表示表示不是。 再上代码:使用回调函数最为AIO的通知 void setup_io(..) { ...//同上 //连接AIO请求和线程回调函数 my_aiocb.aio_sigevent.sigev_notify = SIGEV_THREAD; my_aiocb.aio_sigevent.notify_function = aio_completion_handler; //设置回调函数 my_aiocb.aio_sigevent.notify_attributes = NULL; my_aiocb.aio_sigevent.sigev_value.sival_ptr = &my_aiocb; ... ret = aio_read(&my_aiocb); } //信号处理函数 void aio_completion_handler(int signo, siginfo_t *info, void *context) { struct aiocb *req; req = (struct aiocb *)sigval.sival_ptr; //获得aiocb; //请求的操作是否完成 if(aio_error(req) ==0 ) { ret = aio_return(req); } return ; } 上述程序在创建aiocb请求之后,使用SIGEV_THREAD请求了一个线程回调函数作为通知方法。在回调函数中。通过(struct aiocb *)info->si_value.sival_ptr可以获得对应的aiocb指针,使用AIO函数可验证请求是否已经完成。 在Linux内核中,每个IO请求都对应一个kiocb结构体,其ki_filp成员指向对应的file指针,通过is_sync_kiocb可以判断某Kiocb时候为同步IO请求,如果非真,表示是异步IO请求。
块设备和网络设备本身就是异步的。只有字符设备驱动必须明确指出应支持AIO.需要说明的是AIO对于大多数字符设备而言都不是必须的。只有少数才需要。 在字符设备驱动程序中,file_operations包含了3个和AIO相关的函数。如下: ssize_t (*aio_read) (struct kiocb *iocb, char *buffer, size_t count ,loff_t offset); ssize_t (*aio_write) (struct kiocb *iocb, const char *buffer, size_t count ,loff_t offset); int (*aio_fsync) (struct kiocb *iocb, int datasync); aio_read()和aio_write()与file_operation中的read()和write()中的offset参数不同,它直接传递值,而后者传递的是指针。这两个函数本身也不一定完成读写操作,它只是发起,初始化读写操作。 下面来看看实际的代码部分: //异步读 static ssize_t xxx_aio_read(struct kiocb *iocb, char *buffer, size_t count ,loff_t offset) { return xxx_defer_op(0, iocb, buf, count, pos); } //异步写 static ssize_t xxx_aio_write(struct kiocb *iocb, const char *buffer, size_t count ,loff_t offset) { return xxx_defer_op(1, iocb, (char *)buf, count, pos); } //初始化异步IO static int xxx_defer_op(int write, struct kiocb *iocb, char *buf, size_t count, loff_t pos) { struct async_work *async_wk; int result; //当可以访问buffer时进行复制 if(write) { result = xxx_write (iocb->ki_filp, buf, count, &pos ); } else { result = xxx_read (iocb->ki_filp, buf, count, &pos ); } //如果是同步IOCB, 立即返回状态 if(is_sync_kiocb(iocb)) return resutl; //否则,推后几us执行 async_wk = kmalloc(sizeof(*async_wk), GFP_KERNEL )); if(async_wk==NULL) return result; async_wk->aiocb = iocb; async_ wk->result = result; INIT_WORK(&async_wk->work, xxx_do_deferred_op, async_wk); schedule_delayed_work(&async_wk->work, HZ/100); return -EIOCBOUEUED;//控制权限返回给用户空间 } //延迟后执行 static void xxx_do_deferred_op(void *p) { struct async_work *async_wk = (struct async_work*)p; aio_complete(async_wk_iocb, async_wk->result, 0); kfree(async_wk); } struct async_work { struct kiocb *iocb;//kiocb结构体指针 intresult;//执行结果 struct work_struct work; //工作结构体 }; 在上边代码中最核心的是使用aync_work结构体将操作延迟,通过schedule_delayed_work可以调度其运行,而aio_complete的调用用于通知内核驱动程序已经完成了操作。 |
|