我们在Linux的应用编程中,经常会使用的进程间通讯,pipe是一种常见的匿名管道通讯, 具体Linux内核如何实现系统调用呢?下面我们一起来分析 fs/pipe.c中我们可以看到,pipe被定义为系统调用 SYSCALL_DEFINE2(pipe2, int __user *, fildes, int, flags) { struct file *files[2]; int fd[2]; int error; error = __do_pipe_flags(fd, files, flags); if (!error) { if (unlikely(copy_to_user(fildes, fd, sizeof(fd)))) { fput(files[0]); fput(files[1]); put_unused_fd(fd[0]); put_unused_fd(fd[1]); error = -EFAULT; } else { fd_install(fd[0], files[0]); fd_install(fd[1], files[1]); } } return error; } SYSCALL_DEFINE1(pipe, int __user *, fildes) { return sys_pipe2(fildes, 0); } 可以看到SYSCALL_DEFINE1(pipe, int __user *, fildes)定义了系统调用pipe,主要参数是用户空间整形指针,用于回传文件描述符。 static int __do_pipe_flags(int *fd, struct file **files, int flags) { int error; int fdw, fdr; if (flags & ~(O_CLOEXEC | O_NONBLOCK | O_DIRECT)) return -EINVAL; error = create_pipe_files(files, flags); if (error) return error; error = get_unused_fd_flags(flags); if (error < 0) goto err_read_pipe; fdr = error; error = get_unused_fd_flags(flags); if (error < 0) goto err_fdr; fdw = error; audit_fd_pair(fdr, fdw); fd[0] = fdr; fd[1] = fdw; return 0; err_fdr: put_unused_fd(fdr); err_read_pipe: fput(files[0]); fput(files[1]); return error; } 从create_pipe_files(files, flags);可以看出,pipe的内核底层原理实质上是利用Linux文件子系统来实现管道的通讯,与实际文件不同的是没有向用户空间提供文件节点操作接口,在内存中直接简历文件,文件描述符等。 int create_pipe_files(struct file **res, int flags) { int err; struct inode *inode = get_pipe_inode(); struct file *f; struct path path; if (!inode) return -ENFILE; err = -ENOMEM; path.dentry = d_alloc_pseudo(pipe_mnt->mnt_sb, &empty_name); if (!path.dentry) goto err_inode; path.mnt = mntget(pipe_mnt); d_instantiate(path.dentry, inode); f = alloc_file(&path, FMODE_WRITE, &pipefifo_fops); if (IS_ERR(f)) { err = PTR_ERR(f); goto err_dentry; } f->f_flags = O_WRONLY | (flags & (O_NONBLOCK | O_DIRECT)); f->private_data = inode->i_pipe; res[0] = alloc_file(&path, FMODE_READ, &pipefifo_fops); if (IS_ERR(res[0])) { err = PTR_ERR(res[0]); goto err_file; } path_get(&path); res[0]->private_data = inode->i_pipe; res[0]->f_flags = O_RDONLY | (flags & O_NONBLOCK); res[1] = f; return 0; err_file: put_filp(f); err_dentry: free_pipe_info(inode->i_pipe); path_put(&path); return err; err_inode: free_pipe_info(inode->i_pipe); iput(inode); return err; } 通过 alloc_file(&path, FMODE_WRITE, &pipefifo_fops);可以看到,申请了内存file,通过第二个参数 FMODE_WRITE, FMODE_READ控制,该文件是读端还是写端,这就是我们为什么在编程过程中为什么要一端读一端写的原因,参数pipefifo_fops,这个函数回调集合很重要,它决定了上层读写最终调到内核的函数。 首先看一下 file申请过程,flags过滤 struct file *alloc_file(const struct path *path, fmode_t mode, const struct file_operations *fop) { struct file *file; file = get_empty_filp(); if (IS_ERR(file)) return file; file->f_path = *path; file->f_inode = path->dentry->d_inode; file->f_mapping = path->dentry->d_inode->i_mapping; file->f_wb_err = filemap_sample_wb_err(file->f_mapping); if ((mode & FMODE_READ) && likely(fop->read || fop->read_iter)) mode |= FMODE_CAN_READ; if ((mode & FMODE_WRITE) && likely(fop->write || fop->write_iter)) mode |= FMODE_CAN_WRITE; file->f_mode = mode; file->f_op = fop; if ((mode & (FMODE_READ | FMODE_WRITE)) == FMODE_READ) i_readcount_inc(path->dentry->d_inode); return file; } EXPORT_SYMBOL(alloc_file); 可以看到,通过不同标志位,设置不同文件读写模式,并将f_op指向pipefifo_fops结构体。 const struct file_operations pipefifo_fops = { .open = fifo_open, .llseek = no_llseek, .read_iter = pipe_read, .write_iter = pipe_write, .poll = pipe_poll, .unlocked_ioctl = pipe_ioctl, .release = pipe_release, .fasync = pipe_fasync, }; 最终整个管道的读写都会调到pipe_read、pipe_write回调函数中 static ssize_t pipe_read(struct kiocb *iocb, struct iov_iter *to) { size_t total_len = iov_iter_count(to); struct file *filp = iocb->ki_filp; struct pipe_inode_info *pipe = filp->private_data; int do_wakeup; ssize_t ret; /* Null read succeeds. */ if (unlikely(total_len == 0)) return 0; do_wakeup = 0; ret = 0; __pipe_lock(pipe); for (;;) { int bufs = pipe->nrbufs; if (bufs) { int curbuf = pipe->curbuf; struct pipe_buffer *buf = pipe->bufs + curbuf; size_t chars = buf->len; size_t written; int error; if (chars > total_len) chars = total_len; error = pipe_buf_confirm(pipe, buf); if (error) { if (!ret) ret = error; break; } written = copy_page_to_iter(buf->page, buf->offset, chars, to); if (unlikely(written < chars)) { if (!ret) ret = -EFAULT; break; } ret += chars; buf->offset += chars; buf->len -= chars; /* Was it a packet buffer? Clean up and exit */ if (buf->flags & PIPE_BUF_FLAG_PACKET) { total_len = chars; buf->len = 0; } if (!buf->len) { pipe_buf_release(pipe, buf); curbuf = (curbuf + 1) & (pipe->buffers - 1); pipe->curbuf = curbuf; pipe->nrbufs = --bufs; do_wakeup = 1; } total_len -= chars; if (!total_len) break; /* common path: read succeeded */ } if (bufs) /* More to do? */ continue; if (!pipe->writers) break; if (!pipe->waiting_writers) { /* syscall merging: Usually we must not sleep * if O_NONBLOCK is set, or if we got some data. * But if a writer sleeps in kernel space, then * we can wait for that data without violating POSIX. */ if (ret) break; if (filp->f_flags & O_NONBLOCK) { ret = -EAGAIN; break; } } if (signal_pending(current)) { if (!ret) ret = -ERESTARTSYS; break; } if (do_wakeup) { wake_up_interruptible_sync_poll(&pipe->wait, POLLOUT | POLLWRNORM); kill_fasync(&pipe->fasync_writers, SIGIO, POLL_OUT); } pipe_wait(pipe); } __pipe_unlock(pipe); /* Signal writers asynchronously that there is more room. */ if (do_wakeup) { wake_up_interruptible_sync_poll(&pipe->wait, POLLOUT | POLLWRNORM); kill_fasync(&pipe->fasync_writers, SIGIO, POLL_OUT); } if (ret > 0) file_accessed(filp); return ret; } static ssize_t pipe_write(struct kiocb *iocb, struct iov_iter *from) { struct file *filp = iocb->ki_filp; struct pipe_inode_info *pipe = filp->private_data; ssize_t ret = 0; int do_wakeup = 0; size_t total_len = iov_iter_count(from); ssize_t chars; /* Null write succeeds. */ if (unlikely(total_len == 0)) return 0; __pipe_lock(pipe); if (!pipe->readers) { send_sig(SIGPIPE, current, 0); ret = -EPIPE; goto out; } /* We try to merge small writes */ chars = total_len & (PAGE_SIZE-1); /* size of the last buffer */ if (pipe->nrbufs && chars != 0) { int lastbuf = (pipe->curbuf + pipe->nrbufs - 1) & (pipe->buffers - 1); struct pipe_buffer *buf = pipe->bufs + lastbuf; int offset = buf->offset + buf->len; if (buf->ops->can_merge && offset + chars <= PAGE_SIZE) { ret = pipe_buf_confirm(pipe, buf); if (ret) goto out; ret = copy_page_from_iter(buf->page, offset, chars, from); if (unlikely(ret < chars)) { ret = -EFAULT; goto out; } do_wakeup = 1; buf->len += ret; if (!iov_iter_count(from)) goto out; } } for (;;) { int bufs; if (!pipe->readers) { send_sig(SIGPIPE, current, 0); if (!ret) ret = -EPIPE; break; } bufs = pipe->nrbufs; if (bufs < pipe->buffers) { int newbuf = (pipe->curbuf + bufs) & (pipe->buffers-1); struct pipe_buffer *buf = pipe->bufs + newbuf; struct page *page = pipe->tmp_page; int copied; if (!page) { page = alloc_page(GFP_HIGHUSER | __GFP_ACCOUNT); if (unlikely(!page)) { ret = ret ? : -ENOMEM; break; } pipe->tmp_page = page; } /* Always wake up, even if the copy fails. Otherwise * we lock up (O_NONBLOCK-)readers that sleep due to * syscall merging. * FIXME! Is this really true? */ do_wakeup = 1; copied = copy_page_from_iter(page, 0, PAGE_SIZE, from); if (unlikely(copied < PAGE_SIZE && iov_iter_count(from))) { if (!ret) ret = -EFAULT; break; } ret += copied; /* Insert it into the buffer array */ buf->page = page; buf->ops = &anon_pipe_buf_ops; buf->offset = 0; buf->len = copied; buf->flags = 0; if (is_packetized(filp)) { buf->ops = &packet_pipe_buf_ops; buf->flags = PIPE_BUF_FLAG_PACKET; } pipe->nrbufs = ++bufs; pipe->tmp_page = NULL; if (!iov_iter_count(from)) break; } if (bufs < pipe->buffers) continue; if (filp->f_flags & O_NONBLOCK) { if (!ret) ret = -EAGAIN; break; } if (signal_pending(current)) { if (!ret) ret = -ERESTARTSYS; break; } if (do_wakeup) { wake_up_interruptible_sync_poll(&pipe->wait, POLLIN | POLLRDNORM); kill_fasync(&pipe->fasync_readers, SIGIO, POLL_IN); do_wakeup = 0; } pipe->waiting_writers++; pipe_wait(pipe); pipe->waiting_writers--; } out: __pipe_unlock(pipe); if (do_wakeup) { wake_up_interruptible_sync_poll(&pipe->wait, POLLIN | POLLRDNORM); kill_fasync(&pipe->fasync_readers, SIGIO, POLL_IN); } if (ret > 0 && sb_start_write_trylock(file_inode(filp)->i_sb)) { int err = file_update_time(filp); if (err) ret = err; sb_end_write(file_inode(filp)->i_sb); } return ret; } 分析到这里,pipe的创建和调用已经结束了,谢谢你能内心看到最后!!!!!!!!!!!!! |
|