消息队列概念

消息队列可以看作一个消息链表,有足够写权限的线程可往队列中放置消息,有足够读权限的线程可从队列中取走消息。每个消息都是一个记录,它由发送者赋予一个优先级。在某个进程往一个队列写入消息之前,并不需要另外某个进程在该队列上等待消息的到达(恰好与 PIPE 和 FIFO 相反,因为管道中除非读出者已存在,否则先有写入者是没有意义的)。
一个进程可以往某个队列写入一些消息,然后终止,再让另外一个进程在以后某个时刻读出这些消息。因为消息队列具有随内核的持续性,和管道和FIFO不一样,管道的最后一次关闭发生时,仍在该管道上的数据将被丢弃。
本文只说明使用 POSIX 消息队列,和 SystemV 消息队列相比,虽然有区别,但是也存在很多相似性。
其差别主要如下:

  1. Posix 消息队列的读总是返回最高优先级的最早消息,SystemV 消息队列的读则可以返回任意指定优先级的消息。
  2. 当往一个空队列放置一个消息时,Posix 消息队列允许产生一个信号或启动一个线 程, SystemV 消息队列则不提供类似机制。
  3. 一个消息中,Posix 是一个无符号整数优先级,SystemV 一个长整数类型。

需要注意的是:消息队列特征不同于管道,管道中是字节流模型,没有消息边界,也没有和每个消息关联的类型。而消息队列地可能布局如下:
image.png

mq_open、mq_close、mq_unlink函数

  • mq_open函数

mq_open 函数用于创建一个新的消息队列或打开一个已存在的消息队列。其函数原型为:

  1. #include <fcntl.h> /* For O_* constants */
  2. #include <sys/stat.h> /* For mode constants */
  3. #include <mqueue.h>
  4. mqd_t mq_open(const char *name, int oflag);
  5. mqd_t mq_open(const char *name, int oflag, mode_t mode, struct mq_attr *attr);

参数说明:
name:IPC名字,在之前的 IPC 概念中已经提过。
oflag:参数是 O_RDONLY、O_WRONLY 或 O_RDWR 之一,可能按位或上 O_CREAT、 O_EXCL 或 O_NONBLOCK。在之前的 IPC 概念中也已经提过。
当实际操作是创建一个新队列时(已指定 O_CREAT 标志,且所请求的消息队列尚未存在), mode 和 attr 参数是需要的。mode 参数在之前的 IPC 概念中提过,attr 参数用于给新队列指定某些属性。
返回值:
其返回值称为消息队列描述符,但它不必是(而且很可能不是)像文件描述符或套接字描述符这样的短整数。这个值用作其余 7 个消息队列相关函数的第一个参数。(注:Linux 下 mqd_t 是 int)。出错返回 -1。

  • mq_close 函数

已经打开的消息队列是由 mq_close 关闭的,其函数声明如下:

  1. #include <mqueue.h>
  2. int mq_close(mqd_t mqdes);

参数:
mqdes:就是由 mq_open 创建的描述符。
返回值:
成功返回 0,出错返回 -1。

  • mq_unlink 函数

可以使调用进程不在使用该描述符,但其消息队列并不从系统中删除。一个进程终止时,它的所有打开着的消息队列都关闭, 就像调用了 mq_close 一样。要从系统中删除 mq_open 创建的 IPC 名字,必须调用 mq_unlink。其函数声明如下:

  1. #include <mqueue.h>
  2. int mq_unlink(const char *name);

参数:
name:就是 mq_open 时指定的 IPC 名字。
返回值:
成功返回 0,出错返回 -1。
每个消息队列有一个保存其当前打开着描述符数的引用计数器(就像文件一样),因而本函数能够实现类似于 unlink 函数删除一个文件的机制:当一个消息队列的引用计数仍大于 0,其 name 就能删除,但是该队列的析构要到最后一个 mq_close 发生时才进行。
之前提过 Posix 消息队列至少具备随内核的持续性。所以即使当前没有进程打开着某个消息队列,该队列及其上的各个消息也将一直存在,直到调用 mq_unlink 并最后一个进程关闭该消息队列时,将会被删除。
看如下实例:

  1. #include <fcntl.h>
  2. #include <sys/stat.h>
  3. #include <mqueue.h>
  4. #include <unistd.h>
  5. #include <stdio.h>
  6. #include <stdlib.h>
  7. #define FILE_MODE (S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH)
  8. int main(int argc, char **argv)
  9. {
  10. int c, flags;
  11. mqd_t mqd;
  12. flags = O_RDWR | O_CREAT;
  13. while ((c = getopt(argc, argv, "e")) != -1)
  14. {
  15. switch (c)
  16. {
  17. case 'e':
  18. flags |= O_EXCL;
  19. break;
  20. default:
  21. break;
  22. }
  23. }
  24. if (optind != argc - 1)
  25. {
  26. printf("usage: mq_create[-e] <name>");
  27. exit(1);
  28. }
  29. mqd = mq_open(argv[optind], flags, FILE_MODE, NULL);
  30. mq_close(mqd);
  31. return 0;
  32. }

在编译时报 undefined reference to mq_open、undefined reference to mq_close 时,除了要包含头文件 #include #include 外,还需要加上编译选项 -lrt
注:UNP 卷 2 描述创建的消息队列在 /tmp 目录下,然而实际在 Linux 上操作的时候发现该目录下没有新创建的消息队列。
再来看 mq_unlink 实例:

  1. #include <fcntl.h>
  2. #include <sys/stat.h>
  3. #include <mqueue.h>
  4. #include <unistd.h>
  5. #include <stdio.h>
  6. #include <stdlib.h>
  7. #define FILE_MODE (S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH)
  8. int main(int argc, char **argv)
  9. {
  10. if (argc != 2)
  11. {
  12. printf("usage:mqunlink <name>");
  13. exit(1);
  14. }
  15. if (mq_unlink(argv[1]) == 0)
  16. {
  17. printf("unlink mq:%s succ", argv[1]);
  18. }
  19. return 0;
  20. }

mq_getattr、mq_setattr 函数

每个消息队列有四个属性,mq_getattr 返回所有这些属性,mq_setattr 则设置其中某个 属性。函数原型如下:

  1. #include <mqueue.h>
  2. int mq_getattr(mqd_t mqdes, struct mq_attr *attr);
  3. int mq_setattr(mqd_t mqdes, const struct mq_attr *newattr, struct mq_attr *oldattr);

参数说明:
mqdes:使用 mq_open 创建的描述符。
mq_attr:结构包含一些属性,如下:

  1. struct mq_attr
  2. {
  3. long mq_flags; /* Message queue flags (0, O_NONBLOCK) */
  4. long mq_maxmsg; /* Maximum number of messages. */
  5. long mq_msgsize; /* Maximum message size. */
  6. long mq_curmsgs; /* Number of messages currently queued. */
  7. };

mq_open 的第四个参数可以使用 mq_attr 结构体传递,允许在创建一个新队列时,给它指定 mq_maxmsg 和 mq_msgsize 属性,而 mq_open 会忽略另外两个成员。
返回值:
成功返回 0,出错返回 -1。
mq_getattr 用于将所指定消息队列的当前属性填充到由attr指向的结构。
mq_setattr 给所指定消息队列设置属性,但只使用由 attr 指向的 mq_attr 结构体的 mq_flags 成 员,以设置或清除非阻塞标志,其他成员被忽略,因为每个队列的最大消息数和 每个消息的最大字节数只能在创建队列时设置,队列中的当前消息数则只能获取而不能设 置。
如果 oldattr 指针非空,那么所指定队列的属性(mq_flags、mq_maxmsg 和 mq_msgsize)和当前状态(mq_curmsgs)将返回到由该指针指向的结构体中。
如下实例程序

  1. int main(int argc, char **argv)
  2. {
  3. mqd_t mqd;
  4. struct mq_attr attr;
  5. if (argc != 2)
  6. {
  7. printf("usage: mqgetattr <name>");
  8. exit(1);
  9. }
  10. mqd = mq_open(argv[1], O_RDONLY);
  11. mq_getattr(mqd, &attr);
  12. printf("max msgs = %ld, max bytes/msg = %ld, currently on queue = %ld\n",
  13. attr.mq_maxmsg, attr.mq_msgsize, attr.mq_curmsgs);
  14. mq_close(mqd);
  15. exit(0);
  16. }

mq_send、mq_reveive 函数

这两个函数分别用于往一个队列中放置一个消息和从一个队列中取走一个消息。每个消息有一个优先级,它是一个小于 MQ_PRIO_MAX 的无符号整数。Posix 要求这个上限至少为 32。
mq_receive 总是返回所指定队列中最高优先级的最早消息,而且该优先级能随该消息的内容及其长度一同返回。
这两个函数的声明如下:

  1. #include <mqueue.h>
  2. int mq_send(mqd_t mqdes, const char *msg_ptr, size_t msg_len, unsigned int msg_prio);
  3. ssize_t mq_receive(mqd_t mqdes, char *msg_ptr, size_t msg_len, unsigned int *msg_prio);

这两个函数的前三个参数分别与write和read的前三个参数类似。
参数:
mqdes:由 mq_open 创建的 IPC 描述符。
msg_ptr:指向缓冲区的指针。
msg_len:缓冲区的大小。需要注意的是:其值不能小于能加到所指定队列中的消息的最大大小(该队列 mq_attr 结构的 mq_msgsize 成员)。若 len 小于该值,mq_receive 将立即返回 EMSGSIZE 错误。所以使用 Posix 消息队列时一般在打开某个队列后调用 mq_getattr 确定最大消息大小,然后分配一个或多个那样大小的读缓冲区,要求每个缓冲区总是足以存放队列中的任意消息,mq_receive 就不必返回消息是否大于缓冲区的通知。0 字节长度的消息是允许的。mq_receive 的返回值是所接收消息中的字节数(如果成功)或 −1(如果出错),因此返回值为 0 表示返回长度为 0 的消息。
msg_prio:待发送消息的优先级,其值必须小于MQ_PRIO_MAX。如果 mq_receive 的 priop 参数是一个非空指针,所返回消息的优先级就存放在改指针所指空间。如果不必使用优先级不同的消息,那就给 mq_send 指定值为 0 的优先级,给 mq_receive 指定一 个空指针作为其最后一个参数。
返回值:
mq_send 若成功则为 0,若出错则为 −1。
mq_receive 若成功则为消息中字节数,若出错则为 −1。
注意:Posix 消息队列和大多数IPC消息机制并不标识发送者,这个信息在某些场合可能有用,然而使用 Unix 域套接字时可以提供发送者标识。
mq_send 实例如下:

  1. int main(int argc, char **argv)
  2. {
  3. mqd_t mqd;
  4. void *ptr;
  5. size_t len;
  6. unsigned int prio;
  7. if (argc != 4)
  8. {
  9. printf("usage: mqsend <name> <#bytes> <priority>");
  10. exit(1);
  11. }
  12. len = atoi(argv[2]);
  13. prio = atoi(argv[3]);
  14. mqd = mq_open(argv[1], O_WRONLY);
  15. ptr = calloc(len, sizeof(char));
  16. mq_send(mqd, (char *)ptr, len, prio);
  17. exit(0);
  18. }

mq_receive 实例如下:

  1. int main(int argc, char **argv)
  2. {
  3. int c, flags;
  4. mqd_t mqd;
  5. ssize_t n;
  6. unsigned int prio;
  7. void *buff;
  8. struct mq_attr attr;
  9. flags = O_RDONLY;
  10. while ((c = getopt(argc, argv, "n")) != -1)
  11. {
  12. switch (c)
  13. {
  14. case 'n':
  15. flags |= O_NONBLOCK;
  16. break;
  17. }
  18. }
  19. if (optind != argc - 1)
  20. {
  21. printf("usage: mqreceive [ -n ] <name>");
  22. exit(1);
  23. }
  24. mqd = mq_open(argv[optind], flags);
  25. mq_getattr(mqd, &attr);
  26. buff = malloc(attr.mq_msgsize);
  27. n = mq_receive(mqd, (char *)buff, attr.mq_msgsize, &prio);
  28. printf("read %ld bytes, priority = %u\n", (long)n, prio);
  29. exit(0);
  30. }

消息队列限制

我们已遇到任意给定队列的两个限制,它们都是在创建该队列时建立的:
mq_mqxmsg 队列中的最大消息数。
mq_msgsize 给定消息的最大字节数。
这两个值都没有内在的限制,消息队列的实现定义了另外两个限制:
MQ_OPEN_MAX 一个进程能够同时拥有的打开着消息队列的最大数目(Posix 要求它至少为8)。
MQ_PRIO_MAX 任意消息的最大优先级值加 1(Posix 要求它至少为 32)。
这两个常值往往定义在头文件中,也可以在运行时通过调用 sysconf 函数获取,如下示例:

  1. int main(int argc, char **argv)
  2. {
  3. printf("MQ_OPEN_MAX = %ld, MQ_PRIO_MAX = %ld\n",
  4. sysconf(_SC_MQ_OPEN_MAX), sysconf(_SC_MQ_PRIO_MAX));
  5. exit(0);
  6. }