管道是作为一对文件描述符公开给进程的小型内核缓冲区,一个用于读取,一个用于写入。将数据写入管道的一端使得这些数据可以从管道的另一端读取。管道为进程提供了一种通信方式。
Shell正是通过I/O重定向和管道这种特殊的文件把多个程序的STDIN和STDOUT串联在一起组成更复杂功能的,下面是Shell中通过管道的示意图:
image.png

一、局限性

  1. 历史上,管道是半双工的
  2. 管道只能在具有公共祖先的两个进程之间使用

下面的示例代码使用连接到管道读端的标准输入来运行程序wc。

  1. int p[2];
  2. char *argv[2];
  3. argv[0] = "wc";
  4. argv[1] = 0;
  5. pipe(p); //调用pipe,创建一个新的管道,查找前两个fd,并将它们分配给管道的读写端,在数组p中记录读写fd
  6. if (fork() == 0) { // 子进程
  7. close(0); // 关闭文件描述符0,释放可被使用
  8. dup(p[0]); // 优先分配最小的文件描述符:0,fd(0)指向文件读取端
  9. close(p[0]);
  10. close(p[1]);
  11. exec("/bin/wc", argv);
  12. } else { //父进程
  13. close(p[0]); //关闭管道读取端
  14. write(p[1], "hello world\n", 12); //写入管道
  15. close(p[1]); //关闭写入端
  16. }

二、调用方法

int pipe(int fds[2]);
// fd[0]是管道读取端的fd(文件描述符)
// fd[1]是管道写入端的fd
// 成功时返回:0
// 错误时为-1
通常进程会先调用pipe,再调用fork,从而创建父进程到子进程的IPC通道。
image.png
image.png
从父进程到子进程的通道,父进程关闭读取端fd[0],子进程关闭写端fd[1]。
image.png
反之,从子进程到父进程的管道,父进程关闭写入端,子进程关闭读端。
当管道的一段被关闭后,下列两条规则起作用:

  1. 当read一个写端已被关闭的管道时,所有数据被读取后,read返回0,表示读取结束。
  2. 如果write一个读端已被关闭的管道,会产生信号SIGPIPE。如果忽略该信号或者捕捉该信号并从处理程序返回,则write返回-1,errno设置为EPIPE。 ```c

    include “kernel/types.h”

    include “user/user.h”

int main() { int fds[2]; char buf[100]; int n;

// create a pipe, with two FDs in fds[0], fds[1].
pipe(fds);

write(fds[1], "this is pipe1\n", 14);
n = read(fds[0], buf, sizeof(buf));

write(1, buf, n);

exit(0);

}

```c


#include "kernel/types.h"
#include "user/user.h"

// pipe2.c: communication between two processes

int
main()
{
  int n, pid;
  int fds[2];
  char buf[100];

  // create a pipe, with two FDs in fds[0], fds[1].
  pipe(fds);

  pid = fork();
  if (pid == 0) {
    write(fds[1], "this is pipe2\n", 14);
  } else {
    n = read(fds[0], buf, sizeof(buf));
    write(1, buf, n);
  }

  exit(0);
}

三、管道的实现

本文使用 Linux-2.6.23 内核作为分析对象。

1.环形缓冲区

在内核中,管道使用了环形缓冲区来存储数据。环形缓冲区的原理是:把一个缓冲区当成是首尾相连的环,其中通过读指针和写指针来记录读操作和写操作位置。如下图所示:
image.png
在 Linux 内核中,使用了 16 个内存页作为环形缓冲区,所以这个环形缓冲区的大小为 64KB(16 * 4KB)。
当向管道写数据时,从写指针指向的位置开始写入,并且将写指针向前移动。而从管道读取数据时,从读指针开始读入,并且将读指针向前移动。当对没有数据可读的管道进行读操作,将会阻塞当前进程。而对没有空闲空间的管道进行写操作,也会阻塞当前进程。

2.管道对象

在 Linux 内核中,管道使用 pipe_inode_info 对象来进行管理。我们先来看看pipe_inode_info 对象的定义,如下所示:

struct pipe_inode_info {
    wait_queue_head_t wait;
    unsigned int nrbufs,
    unsigned int curbuf;
    ...
    unsigned int readers;
    unsigned int writers;
    unsigned int waiting_writers;
    ...
    struct inode *inode;
    struct pipe_buffer bufs[16];
};

下面介绍一下pipe_inode_info对象各个字段的作用:

  • wait:等待队列,用于存储正在等待管道可读或者可写的进程。
  • bufs:环形缓冲区,由 16 个 pipe_buffer对象组成,每个 pipe_buffer 对象拥有一个内存页 ,后面会介绍。
  • nrbufs:表示未读数据已经占了环形缓冲区的多少个内存页。
  • curbuf:表示当前正在读取环形缓冲区的哪个内存页中的数据。
  • readers:表示正在读取管道的进程数。
  • writers:表示正在写入管道的进程数。
  • waiting_writers:表示等待管道可写的进程数。
  • inode:与管道关联的 inode 对象。

由于环形缓冲区是由 16 个 pipe_buffer 对象组成,所以下面我们来看看 pipe_buffer 对象的定义:

struct pipe_buffer {
    struct page *page;
    unsigned int offset;
    unsigned int len;
    ...
};

下面介绍一下 pipe_buffer 对象各个字段的作用:

  • page:指向 pipe_buffer对象占用的内存页。
  • offset:如果进程正在读取当前内存页的数据,那么 offset 指向正在读取当前内存页的偏移量。
  • len:表示当前内存页拥有未读数据的长度。

下图展示了 pipe_inode_info对象与 pipe_buffer 对象的关系:
image.png
管道的环形缓冲区实现方式与经典的环形缓冲区实现方式有点区别,经典的环形缓冲区一般先申请一块地址连续的内存块,然后通过读指针与写指针来对读操作与写操作进行定位。
但为了减少对内存的使用,内核不会在创建管道时就申请 64K 的内存块,而是在进程向管道写入数据时,按需来申请内存。
那么当进程从管道读取数据时,内核怎么处理呢?下面我们来看看管道读操作的实现方式。

3.读操作

从 经典的环形缓冲区 中读取数据时,首先通过读指针来定位到读取数据的起始地址,然后判断环形缓冲区中是否有数据可读,如果有就从环形缓冲区中读取数据到用户空间的缓冲区中。如下图所示:
image.png
而 管道的环形缓冲区 与 经典的环形缓冲区 实现稍有不同,管道的环形缓冲区 其读指针是由 pipe_inode_info 对象的 curbuf 字段与 pipe_buffer 对象的 offset 字段组合而成:

  • pipe_inode_info 对象的 curbuf 字段表示读操作要从 bufs 数组的哪个 pipe_buffer 中读取数据。
  • pipe_buffer 对象的 offset 字段表示读操作要从内存页的哪个位置开始读取数据。

读取数据的过程如下图所示:
image.png
从缓冲区中读取到 n 个字节的数据后,会相应移动读指针 n 个字节的位置(也就是增加 pipe_buffer 对象的 offset 字段),并且减少 n 个字节的可读数据长度(也就是减少 pipe_buffer 对象的 len 字段)。
pipe_buffer 对象的 len 字段变为 0 时,表示当前 pipe_buffer 没有可读数据,那么将会对 pipe_inode_info 对象的 curbuf 字段移动一个位置,并且其 nrbufs 字段进行减一操作。
我们来看看管道读操作的代码实现,读操作由 pipe_read 函数完成。为了突出重点,我们只列出关键代码,如下所示:

static ssize_t
pipe_read(struct kiocb *iocb, const struct iovec *_iov, unsigned long nr_segs,
          loff_t pos)
{
    ...
    struct pipe_inode_info *pipe;

    // 1. 获取管道对象
    pipe = inode->i_pipe;

    for (;;) {
        // 2. 获取管道未读数据占有多少个内存页
        int bufs = pipe->nrbufs;

        if (bufs) {
            // 3. 获取读操作应该从环形缓冲区的哪个内存页处读取数据
            int curbuf = pipe->curbuf;  
            struct pipe_buffer *buf = pipe->bufs + curbuf;
            ...

            /* 4. 通过 pipe_buffer 的 offset 字段获取真正的读指针,
             *    并且从管道中读取数据到用户缓冲区.
             */
            error = pipe_iov_copy_to_user(iov, addr + buf->offset, chars, atomic);
            ...

            ret += chars;
            buf->offset += chars; // 增加 pipe_buffer 对象的 offset 字段的值
            buf->len -= chars;    // 减少 pipe_buffer 对象的 len 字段的值

            /* 5. 如果当前内存页的数据已经被读取完毕 */
            if (!buf->len) {
                ...
                curbuf = (curbuf + 1) & (PIPE_BUFFERS - 1);
                pipe->curbuf = curbuf; // 移动 pipe_inode_info 对象的 curbuf 指针
                pipe->nrbufs = --bufs; // 减少 pipe_inode_info 对象的 nrbufs 字段
                do_wakeup = 1;
            }

            total_len -= chars;

            // 6. 如果读取到用户期望的数据长度, 退出循环
            if (!total_len)
                break;
        }
        ...
    }

    ...
    return ret;
}

上面代码总结来说分为以下步骤:

  • 通过文件 inode 对象来获取到管道的pipe_inode_info 对象。
  • 通过 pipe_inode_info 对象的 nrbufs 字段获取管道未读数据占有多少个内存页。
  • 通过 pipe_inode_info 对象的 curbuf 字段获取读操作应该从环形缓冲区的哪个内存页处读取数据。
  • 通过 pipe_buffer 对象的 offset字段获取真正的读指针, 并且从管道中读取数据到用户缓冲区。
  • 如果当前内存页的数据已经被读取完毕,那么移动 pipe_inode_info 对象的 curbuf 指针,并且减少其 nrbufs 字段的值。
  • 如果读取到用户期望的数据长度,退出循环。

    4.写操作

    分析完管道读操作的实现后,接下来,我们分析一下管道写操作的实现。
    经典的环形缓冲区 写入数据时,首先通过写指针进行定位要写入的内存地址,然后判断环形缓冲区的空间是否足够,足够就把数据写入到环形缓冲区中。如下图所示:
    image.png
    但 管道的环形缓冲区 并没有保存 写指针,而是通过 读指针 计算出来。那么怎么通过读指针计算出写指针呢?
    其实很简单,就是:
    写指针 = 读指针 + 未读数据长度
    下面我们来看看,向管道写入 200 字节数据的过程示意图,如下所示:
    如上图所示,向管道写入数据时:
    image.png

  • 首先通过 pipe_inode_infocurbuf 字段和 nrbufs 字段来定位到,应该向哪个 pipe_buffer 写入数据。

  • 然后再通过 pipe_buffer 对象的 offset 字段和 len 字段来定位到,应该写入到内存页的哪个位置。

下面我们通过源码来分析,写操作是怎么实现的,代码如下(为了特出重点,代码有所删减):

static ssize_t
pipe_write(struct kiocb *iocb, const struct iovec *_iov, unsigned long nr_segs,
           loff_t ppos)
{
    ...
    struct pipe_inode_info *pipe;
    ...
    pipe = inode->i_pipe;
    ...
    chars = total_len & (PAGE_SIZE - 1); /* size of the last buffer */

    // 1. 如果最后写入的 pipe_buffer 还有空闲的空间
    if (pipe->nrbufs && chars != 0) {
        // 获取写入数据的位置
        int lastbuf = (pipe->curbuf + pipe->nrbufs - 1) & (PIPE_BUFFERS-1);
        struct pipe_buffer *buf = pipe->bufs + lastbuf;
        const struct pipe_buf_operations *ops = buf->ops;
        int offset = buf->offset + buf->len;

        if (ops->can_merge && offset + chars <= PAGE_SIZE) {
            ...
            error = pipe_iov_copy_from_user(offset + addr, iov, chars, atomic);
            ...
            buf->len += chars;
            total_len -= chars;
            ret = chars;

            // 如果要写入的数据已经全部写入成功, 退出循环
            if (!total_len)
                goto out;
        }
    }

    // 2. 如果最后写入的 pipe_buffer 空闲空间不足, 那么申请一个新的内存页来存储数据
    for (;;) {
        int bufs;
        ...
        bufs = pipe->nrbufs;

        if (bufs < PIPE_BUFFERS) {
            int newbuf = (pipe->curbuf + bufs) & (PIPE_BUFFERS-1);
            struct pipe_buffer *buf = pipe->bufs + newbuf;
            ...

            // 申请一个新的内存页
            if (!page) {
                page = alloc_page(GFP_HIGHUSER);
                ...
            }
            ...
            error = pipe_iov_copy_from_user(src, iov, chars, atomic);
            ...
            ret += chars;

            buf->page = page;
            buf->ops = &anon_pipe_buf_ops;
            buf->offset = 0;
            buf->len = chars;

            pipe->nrbufs = ++bufs;
            pipe->tmp_page = NULL;

            // 如果要写入的数据已经全部写入成功, 退出循环
            total_len -= chars;
            if (!total_len)
                break;
        }
        ...
    }

out:
    ...
    return ret;
}

上面代码有点长,但是逻辑却很简单,主要进行如下操作:

  • 如果上次写操作写入的 pipe_buffer 还有空闲的空间,那么就将数据写入到此 pipe_buffer 中,并且增加其 len 字段的值。
  • 如果上次写操作写入的 pipe_buffer 没有足够的空闲空间,那么就新申请一个内存页,并且把数据保存到新的内存页中,并且增加 pipe_inode_infonrbufs 字段的值。
  • 如果写入的数据已经全部写入成功,那么就退出写操作。

    总结

    1. 为什么父子进程可以通过管道来通信?

    这是因为父子进程通过 pipe 系统调用打开的管道,在内核空间中指向同一个管道对象(pipe_inode_info)。所以父子进程共享着同一个管道对象,那么就可以通过这个共享的管道对象进行通信。

    2. 为什么内核要使用 16 个内存页进行数据存储?

    这是为了减少内存使用。
    因为使用 pipe 系统调用打开管道时,并没有立刻申请内存页,而是当有进程向管道写入数据时,才会按需申请内存页。当内存页的数据被读取完后,内核会将此内存页回收,来减少管道对内存的使用。