1 System V 消息队列

消息队列在内核种的表示:
1547994142625.png

Note:使用ipcs命令可以查看当前系统存在system V的消息队列,共享内存和信号量信息。

1.1 消息队列相关函数

头文件:

  1. #lnclude <sys/types.h>
  2. #include <sys/ipc.h>
  3. #include <sys/msg.h>

msgget函数

函数定义: int msgget(key t key, Int msgflg)
功能: 创建一个新的消息队列; 返回值为消息队列标识,失败返回-1
参数:

  • key:消息队列的名字
  • msgflg:由九个权限标志组成,类似创建文件时的mode权限。创建队列需要加上IPC_CREAT | O_EXCL 标志。

msget的执行逻辑如下:
1547995758474.png

msgctl函数

函数定义: int msgctl(int msqid, int cmd, struct msqid_ds *buff)
功能: 控制消息队列; 成功返回0,失败返回-1
参数:

  • msqid:消息队列标识码
  • cmd:要采取的操作,可取值如下 | 命令 | 说明 | | :—- | —- | | IPC_STAT | 获取当前消息队列的信息,存到buff返回参数中 | | IPC_SET | 设置消息队列,设置值存在buff中 | | IPC_RMID | 删除消息队列 |

msgsnd函数

函数定义: int msgsnd(int msq id, const void *msgp, size_t msgsz, int msgflg)
功能: 发送消息到消息队列; 成功返回0,失败返回-1
参数:

  • msgp:指针,指向要发送的消息
  • msgsz:要发送消息的长度,不包含消息类型的长度
  • msgflg:控制队列满或达到系统上限时的行为。IPC_NOWAIT(消息队列满时不等待,直接返回EAGAIN错误)

    msgrcv函数

    函数定义: ssize_t msgrcv(lnt msqld, void *msgp, Slze_t msgsz, long msgtype, int msgflg)
    功能: 从消息队列获取一条消息; 成功返回接收的字符个数,失败返回-1
    参数:

  • msgp:指针,指向用于接收消息的buff

  • msgsz:接收buff的长度
  • msgtype:实现接收优先级 | type取值 | 优先级 | | —- | —- | | type = 0 | 返回队列第一条消息 | | type > 0 | 返回队列第一条类型等于type的消息 | | type < 0 | 返回队列中第一条类型小于等于type绝对值的消息 |
  • msgflg:控制队列满或达到系统上限时的行为 | flag取值 | 含义 | | —- | —- | | IPC_NOWAIT | 队列没有可读消息时不等待,直接返回EMOMSG错误 | | MSG_NOERROR | 消息大小超过msgsz时截断消息 | | type>0 && flag=MSC_EXCEPT | 接收类型不等于type的第一条消息 |

1.2 代码实例

  1. int len = atoi(argv[1]);
  2. int type = atoi(argv[2]);
  3. int msgid = msgget(1234, 0666 | IPC_CREAT);
  4. if (msgid == -1)
  5. {
  6. msgid = msgget(1234, 0);
  7. if (msgid == -1)
  8. ERR_EXIT("msgget failed");
  9. }
  10. msgbuf* ptr = (msgbuf*)malloc(sizeof(long) + len);
  11. ptr->mtype = type;
  12. if (msgsnd(msgid, ptr, len, 0) < 0)
  13. ERR_EXIT("msgsnd failed");
  14. //=======================================
  15. int msgid;
  16. int rcvsize = 0;
  17. msgid = msgget(1234, 0);
  18. if (msgid == -1)
  19. ERR_EXIT("msgget failed");
  20. msgbuf* ptr = (msgbuf*)malloc(sizeof(long) + MSGMAX);
  21. ptr->mtype = type;
  22. if ((rcvsize = msgrcv(msgid, ptr, MSGMAX, type, flag)) < 0)
  23. ERR_EXIT("msgrcv failed");
  24. printf("read %d bytes from msg queue\r\n", rcvsize);

1.3 消息队列实现服务器客户端

发送和接收是在不同的队列中,可以避免死锁。
imgclip1.png

2 Posix消息队列

2.1 posix消息队列函数

统一头文件:

  1. #include <fcntl.h> /* For O_* constants */
  2. #include <sys/stat.h> /* For mode constants */
  3. #include <mqueue.h>

mq_open

函数定义:

  1. mqd_t mq_open(const char *name, int oflag);
  2. mqd_t mq_open(const char *name, int oflag, mode_t mode, struct mq_attr *attr);

功能: 创建和访问一个消息队列
返回值: 成功返回消息队列文件描述符,失败返回-1
参数:

  • name:消息队列名字, 必须以/开头
  • oflag:与open函数类似,指定权限
  • mode:如果oflag指定为O_CREAT,需要设置mode
  • att:消息队列的属性,指定NULL则使用默认属性

Note:无法使用ipcs查看posix的消息队列。需要挂载mqueue才能看到POSIX创建的消息队列。

  1. # mkdir /dev/mqueue
  2. # mount -t mqueue none /dev/mqueue

mq_close

函数定义:int mq_close(mqd_t mqdes);
功能: 关闭一个消息队列
返回值: 成功返回0,失败返回-1
参数:

  • mqdes:消息队列描述符

    mq_unlink

    函数定义:int mq_unlink(const char *name);
    功能: 删除一个消息队列
    返回值: 成功返回0,失败返回-1
    参数:

  • name:消息队列的名字

    mq_getattr和mq_setattr

    函数定义:

    1. int mq_getattr(mqd_t mqdes, struct mq_attr *attr);
    2. int mq_setattr(mqd_t mqdes, struct mq_attr *newattr, struct mq_attr *oldattr);

    功能: 获取和设置一个消息队列的属性
    返回值: 成功返回0,失败返回-1
    参数:

  • mqdes:消息队列描述符

  • attr: 队列属性结构体

    mq_send

    函数定义:int mq_send(mqd_t mqdes, const char *msg_ptr, size_t msg_len, unsigned msg_prio);
    功能: 向消息队列发送一条消息
    返回值: 成功返回0,失败返回-1
    参数:

  • msg_ptr:指向消息的指针

  • msg_len:消息的长度
  • msg_prio:消息的优先级

    mq_receive

    函数定义:ssize_t mq_receive(mqd_t mqdes, char *msg_ptr, size_t msg_len, unsigned *msg_prio);
    功能: 从消息队列接收一条消息
    返回值: 成功返回接收到的字节数,失败返回-1
    参数:

  • msg_ptr:接收消息的指针

  • msg_len:接收的消息长度
  • msg_prio: 接收到额消息的优先级

注意:该函数总是返回消息队列中最高优先级的消息。

mq_notify

函数定义:int mq_notify(mqd_t mqdes, const struct sigevent *sevp);
功能: 建立或删除消息到达的通知事件
返回值: 成功返回0,失败返回-1
参数:

  • sevp:两个取值
    • NULL: 撤销已注册的通知
    • sevp->sigev_notify = SIGEV_SIGNAL:产生一个信号(具体哪个信号用sevp->sigev_signo指定)通知有消息到达
    • sevp->sigev_notify = SIGEV_THREAD:创建一个线程执行指定的函数(具体哪个函数用sevp->sigev_notify_function指定)

2.2 代码示例

mq_open

  1. #include "common.h"
  2. #include <unistd.h>
  3. #include <sys/types.h>
  4. #include <stdlib.h>
  5. #include <stdio.h>
  6. #include <fcntl.h> /* For O_* constants */
  7. #include <sys/stat.h> /* For mode constants */
  8. #include <mqueue.h>
  9. #include <string.h>
  10. struct student
  11. {
  12. char name[32];
  13. int age;
  14. };
  15. int main()
  16. {
  17. mqd_t mqid;
  18. mqid = mq_open("/abc", O_CREAT | O_RDWR, 0666, NULL);
  19. /** mqid = mq_open("/abc", O_RDWR, 0666, NULL); */
  20. if (mqid == (mqd_t)-1)
  21. ERR_EXIT("mq_open failed");
  22. //获取消息队列参数
  23. mq_attr attr;
  24. if (mq_getattr(mqid, &attr) == -1)
  25. ERR_EXIT("mq_getattr failed");
  26. printf("[mq_getattr:%d] mq_flags:%d, mq_maxmsg:%d, mq_msgsize:%d, mq_curmsgs:%d\r\n", __LINE__, attr.mq_flags, attr.mq_maxmsg, attr.mq_msgsize, attr.mq_curmsgs);
  27. //发送消息到消息队列
  28. student stu = {"barret", 29};
  29. if(mq_send(mqid, (const char*)&stu, sizeof(stu), 1) == -1)
  30. ERR_EXIT("mq_send failed");
  31. if (mq_getattr(mqid, &attr) == -1)
  32. ERR_EXIT("mq_getattr failed");
  33. printf("[mq_getattr:%d] mq_flags:%d, mq_maxmsg:%d, mq_msgsize:%d, mq_curmsgs:%d\r\n", __LINE__, attr.mq_flags, attr.mq_maxmsg, attr.mq_msgsize, attr.mq_curmsgs);
  34. //读取消息队列中消息
  35. student rcvstu;
  36. unsigned prio;
  37. if(mq_receive(mqid, (char*)&rcvstu, attr.mq_msgsize, &prio) == -1)
  38. ERR_EXIT("mq_receive failed");
  39. printf("[mq_receive:%d] name:%s, age:%d, prio:%d\r\n", __LINE__, rcvstu.name, rcvstu.age, prio);
  40. if (mq_getattr(mqid, &attr) == -1)
  41. ERR_EXIT("mq_getattr failed");
  42. printf("[mq_getattr:%d] mq_flags:%d, mq_maxmsg:%d, mq_msgsize:%d, mq_curmsgs:%d\r\n", __LINE__, attr.mq_flags, attr.mq_maxmsg, attr.mq_msgsize, attr.mq_curmsgs);
  43. mq_close(mqid); //关闭队列
  44. mq_unlink("/abc");//删除消息队列
  45. return 0;
  46. }

mq_notify

  1. #include "common.h"
  2. #include <unistd.h>
  3. #include <sys/types.h>
  4. #include <stdlib.h>
  5. #include <stdio.h>
  6. #include <fcntl.h> /* For O_* constants */
  7. #include <sys/stat.h> /* For mode constants */
  8. #include <mqueue.h>
  9. #include <string.h>
  10. #include <signal.h>
  11. struct student
  12. {
  13. char name[32];
  14. int age;
  15. };
  16. mqd_t mqid;
  17. void handleSignal(int sig)
  18. {
  19. mq_attr attr;
  20. if (mq_getattr(mqid, &attr) == -1)
  21. ERR_EXIT("mq_getattr failed");
  22. //读取消息队列中消息
  23. student rcvstu;
  24. unsigned prio;
  25. if(mq_receive(mqid, (char*)&rcvstu, attr.mq_msgsize, &prio) == -1)
  26. ERR_EXIT("mq_receive failed");
  27. printf("[mq_receive:%d] name:%s, age:%d, prio:%d\r\n", __LINE__, rcvstu.name, rcvstu.age, prio);
  28. }
  29. int main()
  30. {
  31. mqid = mq_open("/abc", O_CREAT | O_RDWR, 0666, NULL);
  32. /** mqid = mq_open("/abc", O_RDWR, 0666, NULL); */
  33. if (mqid == (mqd_t)-1)
  34. ERR_EXIT("mq_open failed");
  35. signal(SIGUSR1, handleSignal);
  36. //注册消息通知
  37. sigevent evt;
  38. evt.sigev_notify = SIGEV_SIGNAL;
  39. evt.sigev_signo = SIGUSR1;
  40. mq_notify(mqid, &evt);//使用信号接收新消息通知
  41. //发送消息到消息队列
  42. student stu = {"barret", 29};
  43. if(mq_send(mqid, (const char*)&stu, sizeof(stu), 1) == -1)
  44. ERR_EXIT("mq_send failed");
  45. while(1)
  46. {
  47. pause();
  48. }
  49. mq_close(mqid); //关闭队列
  50. mq_unlink("/abc");//删除消息队列
  51. return 0;
  52. }