1、管道简介

管道是Linux提供的进程间通信机制之一,允许通信进程之间通过文件读写的方式单向传递数据。内核实现的文件系统pipefs,会在内核为每个管道文件分配一个的环形缓存区,以支持读/写操作。进程可以使用两种类型的管道进行通信:

  • 匿名管道:只支持在父子进程、兄弟进程之间通信。一般使用方式为,父进程调用pipe()创建匿名管道,fork()的子进程默认继承父进程打开的管道
  • 命名管道:支持任意的进程通过管道通信。进程通信前,需要先创建fifo类型特殊的文件,然后读写进程分别打开该文件进行读写

2、 管道机制

2.1 匿名使用示例

举一个常见的场景,我们在shell(如bash)执行下面的命令行,该命令行的意思是,将cat 命令执行的标准输出流,重定向至grep命令的输入流。其中'|'字符就是让bash知道,这里有左右两边的命令,需要通过管道进行数据传输。

~ $ cat '/etc/passwd' | grep 'docker'

忽略bash对命令行的解析过程,可以用下面一段简单的代码,展示bash是怎么实现命令之间的管道传输的。

#include <unistd.h>
#include <stdio.h>
#include <string.h>
#include <errno.h>
#include <sys/wait.h>
#include <stdlib.h>
    
int main(int argc, char** argv) {
    int fd[2];
    // fd[0]为管道写端, fd[1]为管道文件读端
    if (pipe(fd) != 0) {
        exit(errno);
    }
        
    int pid = fork();
    if (pid == -1) {
        exit(errno);
    } else if (pid == 0) {
        // 关闭标准输出
        close(1);
        // 利用dup(fd[1]])从管道写端复制一个描述符,新文件新的文件为最小可用的文件描述符,即上面关闭的1,所以这里是将标准输出重定向至管道写端
        dup(fd[1]);
        
        // 关闭管道读端
        close(fd[0]);
        // 关闭已复制的文件描述符
        close(fd[1]);
        
        // 重定向完成,加载命令
        execlp("cat", "/etc/passwd", NULL);
    } else {
        close(0);
        dup(fd[0]);
        close(fd[1]);
        close(fd[0]);
        execlp("grep", "docker" , NULL);
    }

}

我们知道,Linux进程中文件标识符0、1、2分别代表进程的标准输入、标准输出、标准错误流文件,默认是已打开状态,上面的过程简要介绍如下:

  • 10 ~ 13行:进程1调用pipe()命令创建管道文件,文件标识符保存于fd[]
  • 15 ~ 17行:进程1调用fork(),创建子进程2,此时两个进程还是执行bash的代码
  • 20 ~ 31行:进程1关闭原来的标准输出流,然后将管道的写端,置为新的标准输出流。最后执行execlp函数,加载cat程序并执行(加载后会保留已打开的文件,此时标准输出已被重定向)
  • 33 ~ 37行:与上面过程类似,进程2将管道的读端重定向至标准输入流,然后加载grep命令

这样,当cat执行print()或其它函数向标准输出流写数据时,grep使用scan()等函数即可从标准输入读到相应的数据,使用管道+重定向后的效果如下:

2.2 管道实现机制分析

进程使用管道通信时涉及的内核数据结构简介如下:

  • file struct:file对象对应进程中打开的文件,保存了该对象关联的inode及读写操作的函数。在这层可以提供管道单向传输的特性,如管道读端(fd[1])对应的file对象,只保存了读操作的函数,没有写操作函数。
  • inode struct:inode是Linux VFS的关键结构,唯一标识一个存在文件对象。可以看到,管道是在Linux VFS下实现的一种特殊文件,而对于同一个文件,无论有多少进程打开,只会对应着一个inode对象。
  • pipe_inode_info struct:管道的控制结构,主要包含了当前缓存区信息及读写者信息。其中curbuf表示第一个存在未读数据的块的缓存区编号,nrbufs表示有效缓冲区的长度,buffers表示全部缓存区的数目,reader/writer分别表示读写端的数目。

 管道读写过程分析

读写操作的实现在Linux源码的fs/pipe.c文件中,主要逻辑是对环形缓存区的读写。下面结合关键代码,介绍一些使用时需要注意的地方。

写操作分析

对所有读端都被关闭的管道进行写操作,内核会向进程发送SIGPIPE信号,触发进程相应的信号处理回调(默认是退出进程)。有时在shell执行命令,看到的'xxx: write error: broken pipe'就是这类情况,即'|'符号右边的命令已经退出了,左边的命令还在执行且向管道写数据。

static ssize_t
pipe_write(struct kiocb *iocb, const struct iovec *_iov, unsigned long nr_segs, loff_t ppos)
{
    struct file *filp = iocb->ki_filp;
    struct inode *inode = file_inode(filp);
    struct pipe_inode_info *pipe;
    ...
    mutex_lock(&inode->i_mutex);
    pipe = inode->i_pipe;
    
    // 读端计数为0,则向进程发送GIGPIPE信号
    if (!pipe->readers) {
        send_sig(SIGPIPE, current, 0);
        ret = -EPIPE;
        goto out;
    }
    ...
}

 每次写入都在空的缓存区上操作,数据量小于块大小(缓存区块大小和Page一样,为PAGE_BUF)时,数据会写入到同一个块中,否则切分到不同块。因此,多端同时写入时,一次write的大多数据可能写入到不连续的块中。如果没有空的缓存区块,且还有待写的数据时,会进入阻塞状态,直到有进程读了数据,空出了完整的缓存区块。

static ssize_t
pipe_write(struct kiocb *iocb, const struct iovec *_iov, unsigned long nr_segs, loff_t ppos)
{
    ...
    size_t total_len;
    ...
    total_len = iov_length(iov, nr_segs)
    ...
    for (;;) {
        int bufs;
        ...
        bufs = pipe->nrbufs;
        if (bufs < pipe->buffers) {
            ...
            // 一次最多写一个PAGE_SIZE大小的数据
            chars = PAGE_SIZE;
            if (chars > total_len)
                chars = total_len;
            ...
            // 从用户空间拷贝数据到缓存区
            error = pipe_iov_copy_from_user(src, iov, chars, atomic);
            ...
            // 不管缓冲区块是否写满,都切换至下一个缓冲区块
            pipe->nrbufs = ++bufs;
            ...
            total_len -= chars;
            if (!total_len)
                break;
        }
        if (bufs < pipe->buffers)
            continue;
        ...
        // 写入了新数据,唤醒阻塞的读进程
        if (do_wakeup) {
            wake_up_interruptible_sync_poll(&pipe->wait, POLLIN | POLLRDNORM);
            kill_fasync(&pipe->fasync_readers, SIGIO, POLL_IN);
            do_wakeup = 0;
        }
        pipe->waiting_writers++;
        // 阻塞至有新的缓存区块
        pipe_wait(pipe);
        pipe->waiting_writers--;
    }
out:
    mutex_unlock(&inode->i_mutex);
    ...
    return ret;
}

读操作分析

读操作会连续读缓存区的数据,直至读取了足够的数据为止,如果期间缓冲区的数据都被读取了,那就会阻塞直到有新数据写入或写端被关闭。

static ssize_t pipe_read(struct kiocb *iocb, const struct iovec *_iov, unsigned long nr_segs, loff_t pos)
{
    ...
    size_t total_len;
    // 目标读取数目
    total_len = iov_length(iov, nr_segs);
    ...
    pipe = inode->i_pipe;
    for (;;) {
        int bufs = pipe->nrbufs;
        if (bufs) {
            int curbuf = pipe->curbuf;
            struct pipe_buffer *buf = pipe->bufs + curbuf;
            ...
            size_t chars = buf->len;
            ...

            if (chars > total_len)
                chars = total_len;
            ...
            // 从缓冲区拷贝数据到进程
            addr = ops->map(pipe, buf, atomic);
            error = pipe_iov_copy_to_user(iov, addr + buf->offset, chars, atomic);
            ...
            // 更新已读区间
            ret += chars;
            buf->offset += chars;
            buf->len -= chars;
            ...
            if (!buf->len) {
                ...
                curbuf = (curbuf + 1) & (pipe->buffers - 1);
                pipe->curbuf = curbuf;
                pipe->nrbufs = --bufs;
                // 该缓存区块数据已经全部读完,可以唤醒阻塞的读端
                do_wakeup = 1;
            }
            total_len -= chars;
            if (!total_len)
                break;  /* common path: read succeeded */
        }
        // 还有未读的块,继续读取
        if (bufs)   /* More to do? */
            continue;
        // 不会再有新数据
        if (!pipe->writers)
            break;
        ...
        if (do_wakeup) {
            wake_up_interruptible_sync_poll(&pipe->wait, POLLOUT | POLLWRNORM);
            kill_fasync(&pipe->fasync_writers, SIGIO, POLL_OUT);
        }
        pipe_wait(pipe);
    }
    ...
}

3、分析总结

  • 写所有读端关闭的管道写入数据,会产生SIGPIPE信号,写端进程应该要考虑对这个信号进行处理。
  • 一次写入管道的数据量不超过PAGE_BUF时,才能保证写入的原子性,如果存在多端写入的情况,应该避免数据长度过大。
  • 与其它文件操作类似,读写端都会阻塞。

命名管道的实现机制与匿名管道相同,具体可以参考fs/fifo.c文件的实现