分享

Posix Message Queues 笔记

 WUCANADA 2013-09-11

Posix Message Queues 笔记

■ 调用接口

#include <fcntl.h> /* For O_* constants */
#include <sys/stat.h> /* For mode constants */
#include <mqueue.h>

Link with -lrt.

mqd_t mq_open(const char *name, int oflag);
mqd_t mq_open(const char *name, int oflag, mode_t mode,
struct mq_attr *attr);


mqd_t mq_close(mqd_t mqdes);
mqd_t mq_unlink(const char *name);


mqd_t mq_getattr(mqd_t mqdes, struct mq_attr *attr);
mqd_t mq_setattr(mqd_t mqdes, struct mq_attr *newattr,
struct mq_attr *oldattr);


mqd_t mq_getattr(mqd_t mqdes, struct mq_attr *attr);
mqd_t mq_setattr(mqd_t mqdes, struct mq_attr *newattr,
struct mq_attr *oldattr);


mqd_t mq_notify(mqd_t mqdes, const struct sigevent *notification);

union sigval { /* Data passed with notification */
int sival_int; /* Integer value */
void *sival_ptr; /* Pointer value */
};

struct sigevent {
int sigev_notify; /* SIGEV_{NONE,SIGNAL,THREAD} */
int sigev_signo; /* Notification signal */
union sigval sigev_value; /* Data passed with
notification */
/* following two if SIGEV_THREAD */
void (*sigev_notify_function) (union sigval);
/* Function for thread
notification */
void *sigev_notify_attributes;
/* Thread function attributes */
};


ssize_t mq_receive(mqd_t mqdes, char *msg_ptr,
size_t msg_len, unsigned *msg_prio);
mqd_t mq_send(mqd_t mqdes, const char *msg_ptr,
size_t msg_len, unsigned msg_prio);

■ 与管道的不同:
   1. 读消息与写消息是相互独立的,而管道,读者与写者必须同时出现。
   2. 消息队列生存期是内核级别的,而管道是进程级别的,当进程终止时,管道里所有数据都被丢弃。


■ 与system v message queue 相比较,IPC message queue有以下两个主要不同点:
    1.读IPC message queue总是返回最高优先级的最早放入的消息。
    读system v message queue可以返回任意优先级的消息。
    2.当消息投放到空的IPC message queue时,可以生成一个信号或启动一个线程。
   
■ 一个消息由3部分组成: 数据、长度以及优先级。

■ mq_open与文件的open函数类似,但多了一个属性设置的参数。需要注意的有两点:
    1.属性(struct mq_attr)中的mq_maxmsg和mq_msgsize参数只能在队列创建时设置(调用 mq_setattr 设置无效),并且两者必须同时指定(不能随意更改队列大小、消息大小);而mq_flags(O_NONBLOCK)可以通过mq_open或者 mq_setattr来设置。
    2.队列名的指定应该要以"/"开头,而且应该名字中只有一个"/"。(其他 Posix IPC 对象同样如此)

■ mq_send与mq_receive和文件的读写函数类似,但多了一个优先级相关的参数。同样
有3点要注意:
    1.不需要为消息指定优先级时,mq_send 时该参数总是设为0。
    2.允许长度为0的消息。
    3.当队列满或者空时,mq_send或mq_receive默认会阻塞。
    4.mq_receive 中指定的缓冲区的长度至少要和消息队列消息的最大长度相等(或者大于),否则返回
      EMSGSIZE。这也就意味着通常使用 mq_attr 结构体 的 mq_msgsize 成员来指定mq_receive 中缓冲区
      的长度。

■ 上面说,当消息投放到空的IPC message queue时,可以生成一个信号或启动一个线程。
IPC message queue通过 mq_notify 函数提供了异步通知机制,通过在进程中调用
mq_notify,称为将进程注册到消息队列的通知中。有几点需要注意;
    1.对于给定的消息队列,任一时刻只能有一个进程被注册而允许通知。
    2.当已注册的进程因mq_receive而阻塞时,通知不发送。mq_receive的优先级
    高于通知。(也就是,mq_receive不能被打断)
    3.当收到通知后,进程的注册立即取消。如果需要,进程必须再次注册。

由于阻塞模下,当消息队列空时,调用 mq_receive 会阻塞,因而通常使用

“ signal notification with nonblocking mq_receive ” 或 “ signal notification(using sigwait instead of signal handler)with nonblocking mq_receive " 或 ” initiate thread “ 。    等方法来读取消息。

(请参看 UNPv2 94页、96页、99页)

■ 限制
除了 mq_attr 结构中定义的两个限制 mq_maxmsg 和 mq_msgsize 外,实现还定义了两个限制:MQ_OPEN_MAX 和 MQ_PRIO_MAX。

mq_maxmsg 和 mq_msgsize 没有什么内在限制,理论上可以达到 长整形的最大值。

MQ_OPNE_MAX 系统能打开的最大 消息队列数
MQ_PRIO_MAX 消息的最大优先数加1 。
上面这两个常量通常定义在 <unistd.h> 中,可以通过 sysconf 查看。


可以通过 /proc 文件 来指定系统对消息队列的限制值。
$ cat /proc/sys/fs/mqueue/msgsize_max
8192
$ cat /proc/sys/fs/mqueue/queues_max
256
$ cat /proc/sys/fs/mqueue/msg_max
10

也可以在 /etc/sysctl.conf 中添加配置,在 reboot 时设置。
fs.mqueue.msg_max=1000
fs.mqueue.msgsize_max=8192

■ 其他 (zz)

>Hi,
>
>I hope this is the appropriate group to post this question. If I'm
>wrong, please accept my apologies and correct me.
>
>Are there standard utilities for listing/removing POSIX IPC objects,
>equivalent to ipcs/ipcrm for SysV IPC ?

Gidday Paulo,

No, unfortunately, there are not. On many Unix implementations
(including Linux), POSIX IPC objects are implemented in some sort of
virtual file system, and you can thus just use ls(1) and rm(1).
However, the precise location of these virtual file systems varies
across implementations. (SUSv3 does not help here: it doesn't specify
how to list/remove POSIX IPC objects from the command line, not
specify where they should be located.)

Cheers,

Michael

PS On Linux, POSIX shared memory and semaphore objects are typically
under /dev/shm. For message queues you need to mount the file system
using something like:

$ mkdir /dev/mqueue
$ mount -t mqueue none /dev/mqueue

Linux POSIX Message Queue

Contents

 [hide

Introduction

Linux POSIX Message Queue


An overview of the POSIX message queue can be found at mq_overview

Prerequisite

For POSIX message queue to work, following points to be taken care

  • Linking the source binary should be done with Real-time library i.e with gcc option -lrt
  • Kernel should be built with configuration option CONFIG_POSIX_MQUEUE enabled.
  • POSIX Message queue support is available from Linux kernel 2.6.6 and Glibc version 2.3.4.

Library APIs

Following header files need to be included for POSIX message queue

#include <fcntl.h>
#include <sys/stat.h>
#include <mqueue.h>

mq_open()

Creates a new message queue or opens an existing queue. ManPage

mq_close()

Closes a particular message queue previously opened by the same process. ManPage

mq_send()

Sends a message to the queue. ManPage

mq_timedsend()

Sends a message to the queue, and if the message queue if full, waits till time-out specified. ManPage

mq_receive()

Receive the message from a message queue. ManPage.

  • Receive the oldest message with the highest priority from the queue.
  • If currently no message is available in the queue
    • Return immediatley with failure if queue was created with O_NONBLOCK flag.
    • Blocks until
      • New message is available in the queue, or
      • The call is interrupted by a signal handler.
 #include <mqueue.h>

 ssize_t mq_receive(mqd_t mqdes, char *msg_ptr,
                    size_t msg_len, unsigned *msg_prio); 

 mqdes       - Message queue descriptor.
 msg_ptr     - Address of the buffer to which the message to be stored.
 msg_len     - Size of the above buffer. 
               This value should be greater than the mq_msgsize attribute 
                          (see mq_getattr()) of the queue.
 msg_prio    - Non-NULL : Address of unsigned variable to which the priority 
                          of the received message to be stored.
               NULL     : Priority value is not returned.

 Return value - On Success  : Number of bytes in the received message.
              - On falilure : -1

mq_timedreceive()

Reads a message from the queue with a timeout. ManPage

  • Receive the oldest message with the highest priority from the queue.
  • If currently no message is available in the queue
    • Return immediately with failure if queue was created with O_NONBLOCK flag.
    • Blocks until
      • New message is available in the queue, or
      • Timeout value is reached, or
      • The call is interrupted by a signal handler.
 #include <time.h>
 #include <mqueue.h>
  
 ssize_t mq_timedreceive(mqd_t mqdes, char *msg_ptr,
	size_t msg_len, unsigned *msg_prio,
	const struct timespec *abs_timeout);

 mqdes       - Message queue descriptor.
 msg_ptr     - Address of the buffer to which the message to be stored.
 msg_len     - Size of the above buffer. 
               This value should be greater than the mq_msgsize attribute 
                          (see mq_getattr()) of the queue.
 msg_prio    - Non-NULL : Address of unsigned variable to which the priority 
                          of the received message to be stored.
               NULL     : Priority value is not returned.
 abs_timeout - Pointer to the timespec structure, which represents the 
                          absolute time (time since the Epoch time, 1970-01-01 
                          00:00:00 +0000 (UTC)) up to which this call will 
                          block if the message queue is empty.

                          struct timespec {
                             time_t tv_sec;  // Seconds since the Epoch time
                             long   tv_nsec; // Nanoseconds time fraction from 
                                             // above mentioned second value.
                          };

 Return value - On Success  : Number of bytes in the received message.
              - On failure : -1

Normally the timespec structure value is formed by

  • Getting the current time using clock_gettime(CLOCK_REALTIME, &tp) function.
  • Add the relative timeout value to this timespec structure

mq_notify()

Registers or unregisters delivery of an asynchronous notification when a new message arrives on the empty message queue. ManPage

mq_unlink()

Removes a message queue, and mark the message queue for deletion when all the processes closes it. ManPage

mq_setattr()

Sets message queue attributes such as maximum number of messages on queue, maximum message size etc. ManPage

mq_getattr()

Gets message queue attributes. ManPage

Linux POSIX Message Queue Example

Simple message queue send receive

View Source code : Click Here

Code can be compiled and tested by running following commands

$ git clone https://github.com/linuxpedia/linuxpedia_examples.git
$ cd linuxpedia_examples/system_programming/message_queue/mq_sendreceive/
$ make
$ ./receive.out
 Message Queue Opened
 Receiving message ....
 Successfully received 12 bytes
 Message : hello world
 Message Queue Closed
 Message Queue unlinked

Open another shell and run following command

$ ./send.out
 Message Queue Opened
 Sending message.... Done
 Message Queue Closed

Message queue receive with timeout

View Source code : Click Here

Code can be compiled and tested by running following commands

$ git clone https://github.com/linuxpedia/linuxpedia_examples.git
$ cd linuxpedia_examples/system_programming/message_queue/mq_timedreceive/
$ make
$ ./runtest.sh

Click here to go back to the page Message Queue

--Anooj Gopi 20:32, 22 September 2012 (MDT)


    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多