概念
可以定义任意数量的邮箱(仅受可用 RAM 的限制)。每个邮箱都由其内存地址引用。
邮箱具有以下主要属性:
- 已发送但尚未接收的消息的发送队列。
- 等待接收消息的线程的接收队列。
必须先初始化邮箱,然后才能使用。这会将其两个队列设置为空。
邮箱允许线程
交换邮件。发送消息的线程称为发送线程
,而接收消息的线程称为接收线程
。每条消息只能由一个线程接收,不支持点对多点的广播消息。
使用邮箱交换的邮件以非匿名
方式处理,参与交换的两个线程都对方线程的标识。
邮件描述符
邮件描述符
是一种数据结构,它指定邮件数据所在的位置以及邮箱如何处理邮件。发送线程和接收线程在访问邮箱时都提供邮件描述符
。邮箱使用邮件描述符在兼容的发送和接收线程之间执行邮件交换。邮箱还会在交换期间
更新某些邮件描述符字段,使两个线程都知道发生了什么。
邮箱邮件包含零个
或多个字节
的邮件数据
。消息数据的大小和格式是应用程序定义的,并且可能因消息而异。有两种形式的消息数据:
消息缓冲区
是由发送或接收消息的线程提供的内存区域。数组或结构变量通常可用于此目的。- 消息块是从内存池中分配的内存区域。
消息不能不同时具有消息缓冲区
和消息块
。两种消息数据都不具有的消息称为空消息
。
消息生命周期
消息的生命周期很简单。当发送线程将邮件提供给邮箱时,将创建邮件。然后,邮件归邮箱所有,直到将其提供给接收线程。接收线程可以在从邮箱接收邮件时检索邮件数据
。仅当发生数据检索时,邮件才会被邮箱删除。
线程兼容性
发送线程可以指定消息发送到的线程的地址
,也可以通过指定K_ANY
来将其发送到任何线程。同样,接收线程可以指定它希望从中接收消息的线程的地址,也可以通过指定K_ANY
来接收来自任何线程的消息。仅当同时满足发送线程和接收线程的要求时,才交换消息。
例如,如果线程 A 将消息发送到线程 B(并且仅向线程 B),则当线程 B 尝试从线程 A 接收消息或线程 B 尝试从任何线程接收消息时,线程 B 将接收该消息。如果线程 B 尝试从线程 C 接收消息,则不会发生交换。
消息流控制
邮箱邮件可以同步
或异步
交换。
在同步交换中,发送线程将阻塞
,直到消息被接收线程完全处理。
在异步交换中,发送线程不会等到消息被另一个线程接收后再继续。这允许发送线程在将消息提供给接收线程并完全处理之前执行其他工作(例如收集将在下一条消息中使用的数据)。
消息交换的技术由发送线程确定。
同步交换技术提供了一种隐式形式
的流控制,可防止发送线程生成消息的速度快于接收线程使用消息的速度。
异步交换技术提供了一种显式形式
的流控制,它允许发送线程在发送后续消息之前确定以前发送的消息是否仍然存在。
定义邮箱
邮箱是使用类型为k_mbox
的变量定义的。然后必须通过调用k_mbox_init()
对其进行初始化。
struct k_mbox my_mailbox;
k_mbox_init(&my_mailbox);
也可以通过调用K_MBOX_DEFINE
让邮箱在编译时定义和初始化。
K_MBOX_DEFINE(my_mailbox);
邮件描述符的字段
邮件描述符是k_mbox_msg
类型的结构。只应使用下面列出的字段;任何其他字段仅供邮箱内部使用。
字段 | 描述 |
---|---|
info | 一个 32 位值,由消息发送方和接收方交换定义,其含义由应用程序定义。此交换是双向的,允许发送方在任何消息交换期间向接收方传递值,并允许接收方在同步消息 交换期间向发送方传递值。 |
size | 消息数据大小,以字节为单位。在发送空消息 时或者在发送没有实际数据的消息缓冲区 或消息块 时,将其设置为零。接收消息时,请将其设置为所需的最大数据量 ,如果不需要消息数据,则将其设置为零。邮箱 使用收到邮件后交换的实际数据字节数更新此字段。 |
tx_data | 指向发送线程的消息缓冲区 的指针。当发送内存块或发送空消息时将其设置为NULL。接收邮件时,请保持此字段未初始化状态。 |
tx_block | 发送线程的内存块 的描述符。在发送空消息时将tx_block.data 设置为NULL。在发送消息缓冲区 或接收消息 时,请保持此字段未初始化状态。 |
tx_target_thread | 接收线程的地址。将其设置为K_ANY 表示允许任何线程接收消息。接收邮件时,请保持此字段未初始化状态。收到邮件后,邮箱将使用实际收件人的地址更新此字段。 |
rx_source_thread | 发送线程的地址。将其设置为K_ANY 表示接收任何线程发送的消息。发送邮件时,请保持此字段未初始化状态。将邮件放入邮箱时,邮箱将使用此字段,使其具有实际发件人的地址 |
发送消息
线程通过首先创建其消息数据
来发送消息。当数据量较小,并且复制数据的成本低于分配和释放消息块的成本时,通常使用消息缓冲区。
接下来,发送线程创建一个邮件描述符
,用于描述要发送的消息的特征。
最后,发送线程调用邮箱发送 API 来启动邮件交换。如果有线程当前正在等待消息,则该消息将立即提供给兼容的接收线程。否则,邮件将添加到邮箱的发送队列中。
发送队列中可以同时存在任意数量的消息。发送队列中的消息根据发送线程的优先级进行排序。对具有相同优先级的消息进行排序,以便可以首先接收最旧的消息。
对于同步
发送操作,当接收线程同时接收消息并检索消息数据时,该操作通常会完成。如果在达到发送线程指定的等待时间时未读取邮件,则会从邮箱的发送队列中删除邮件,并且发送操作将失败。发送操作成功完成后,发送线程可以检查消息描述符
,以确定哪个线程接收了消息、交换了多少数据以及接收线程提供的应用程序定义的信息值。
同步发送操作可能会无限期地阻塞发送线程,即使线程指定了最长等待时间也是如此。等待时间仅限制邮箱在另一个线程收到邮件之前等待的时间。收到消息后,接收线程检索消息数据所需的时间没有限制。
对于异步发送操作,该操作始终立即完成。这允许发送线程继续处理,而不管消息是立即提供给接收线程还是添加到发送队列。发送线程可以选择指定邮箱在邮箱删除邮件时提供的信号量
。使用信号量允许发送线程轻松实现流控制机制,该机制确保邮箱在任何时间点保存的邮件数不超过应用程序指定的发送线程的邮件数。
异步发送消息的线程无法确定哪个线程接收了消息、交换了多少数据或接收线程提供的应用程序定义的信息值。
发送空消息
此代码使用邮箱将4字节
随机值同步传递给任何需要4字节
随机值的接收线程
。消息info
字段足够大,可以承载正在交换的信息,因此不需要使用消息的数据部分。
void producer_thread(void)
{
struct k_mbox_msg send_msg;
while (1) {
/* generate random value to send */
uint32_t random_value = sys_rand32_get();
/* prepare to send empty message */
send_msg.info = random_value;
send_msg.size = 0;
send_msg.tx_data = NULL;
send_msg.tx_block.data = NULL;
send_msg.tx_target_thread = K_ANY;
/* send message and wait until a consumer receives it */
k_mbox_put(&my_mailbox, &send_msg, K_FOREVER);
}
}
使用消息缓冲区发送数据
此代码使用邮箱将可变大小的数据从生产线程同步传递到任何需要它的接收线程。消息info
字段用于交换接收和发送线程可以处理的最大消息缓冲区的大小。
void producer_thread(void)
{
char buffer[100];
int buffer_bytes_used;
struct k_mbox_msg send_msg;
while (1) {
/* generate data to send */
...
buffer_bytes_used = ... ;
memcpy(buffer, source, buffer_bytes_used);
/* prepare to send message */
send_msg.info = buffer_bytes_used;
send_msg.size = buffer_bytes_used;
send_msg.tx_data = buffer;
send_msg.tx_block.data = NULL;
send_msg.tx_target_thread = K_ANY;
/* send message and wait until a consumer receives it */
k_mbox_put(&my_mailbox, &send_msg, K_FOREVER);
/* info, size, and tx_target_thread fields have been updated */
/* verify that message data was fully received */
if (send_msg.size < buffer_bytes_used) {
printf("some message data dropped during transfer!");
printf("receiver only had room for %d bytes", send_msg.info);
}
}
}
使用消息块发送数据
此代码使用邮箱发送异步邮件
。信号量
用于在使用前一条消息之前阻止
新消息的发送,以便接收线程无法跟上时不会积压消息。
消息数据存储在从内存池获取的内存块中,从而消除了交换大型消息时不需要的数据复制。内存池仅包含两个块:在处理先前发送的块时,新的块被数据填满
/* define a semaphore, indicating that no message has been sent */
K_SEM_DEFINE(my_sem, 1, 1);
/* define a memory pool containing 2 blocks of 4096 bytes */
K_MEM_POOL_DEFINE(my_pool, 4096, 4096, 2, 4);
void producer_thread(void)
{
struct k_mbox_msg send_msg;
volatile char *hw_buffer;
while (1) {
/* allocate a memory block to hold the message data */
k_mem_pool_alloc(&my_pool, &send_msg.tx_block, 4096, K_FOREVER);
/* keep overwriting the hardware-generated data in the block */
/* until the previous message has been received by the consumer */
do {
memcpy(send_msg.tx_block.data, hw_buffer, 4096);
} while (k_sem_take(&my_sem, K_NO_WAIT) != 0);
/* finish preparing to send message */
send_msg.size = 4096;
send_msg.tx_target_thread = K_ANY;
/* send message containing most current data and loop around */
k_mbox_async_put(&my_mailbox, &send_msg, &my_sem);
}
}
接收消息
线程通过首先创建一个邮件描述符
来接收消息,该描述符表征了它要接收的消息的特征。然后,它调用其中一个邮箱接收API。邮箱搜索其发送队列,并从找到的第一个兼容线程中获取邮件。如果不存在兼容的线程,接收线程可以选择等待。如果在达到接收线程指定的等待时间未出现兼容的线程,则接收操作将失败。接收操作成功完成后,接收线程可以检查邮件描述符,以确定哪个线程发送了消息、交换了多少数据以及发送线程提供的应用程序定义的信息值。
任意数量的接收线程都可以在邮箱的接收队列上同时等待。线程根据其优先级进行排序;对具有相同优先级的线程进行排序,以便首先开始等待的线程可以先接收消息。
接收线程既控制传入消息中检索的数据量,也控制数据的最终位置。线程可以选择获取消息中的所有数据,或仅获取数据的初始部分,或者根本不获取任何数据。同样,线程可以选择将数据复制到其选择的消息缓冲区中,也可以选择将数据放在消息块中。当涉及的数据量较小,并且复制数据的成本低于分配和释放内存池块的成本时,通常使用消息缓冲区。
接收时检索数据
线程检索消息数据的最直接方法是在收到消息时指定消息缓冲区。线程指示消息缓冲区的位置及其大小。
作为接收操作的一部分,邮箱将邮件的数据复制到邮件缓冲区。如果消息缓冲区不够大,无法包含消息的所有数据,则将丢失任何未复制的数据。如果消息不够大,无法用数据填充所有缓冲区,则消息缓冲区的未使用部分保持不变。在所有情况下,邮箱
都会更新接收线程的邮件描述符
,以指示复制了多少个数据字节。
即时数据检索技术最适合于事先知道消息最大大小的消息。
下面的代码使用邮箱来处理来自任何生成线程的可变大小的请求,并使用即时数据检索技术。消息info
字段用于交换发送和接收线程可以处理的最大消息缓冲区大小的信息。
void consumer_thread(void)
{
struct k_mbox_msg recv_msg;
char buffer[100];
int i;
int total;
while (1) {
/* prepare to receive message */
recv_msg.info = 100;
recv_msg.size = 100;
recv_msg.rx_source_thread = K_ANY;
/* get a data item, waiting as long as needed */
k_mbox_get(&my_mailbox, &recv_msg, buffer, K_FOREVER);
/* info, size, and rx_source_thread fields have been updated */
/* verify that message data was fully received */
if (recv_msg.info != recv_msg.size) {
printf("some message data dropped during transfer!");
printf("sender tried to send %d bytes", recv_msg.info);
}
/* compute sum of all message bytes (from 0 to 100 of them) */
total = 0;
for (i = 0; i < recv_msg.size; i++) {
total += buffer[i];
}
}
}
使用消息缓冲区检索数据
接收线程可以选择在接收消息时延迟
消息数据检索,以便以后可以将数据检索到消息缓冲区中。线程通过指定消息缓冲区位置和大小来指示它以后愿意检索的最大数据量来实现此目的。
邮箱不会在接收操作过程中复制任何邮件数据。但是,邮箱仍会更新接收线程的邮件描述符,以指示有多少数据字节可用于检索。
然后,接收线程必须按如下方式响应:
- 如果消息描述符大小为零,则发送方的消息不包含任何数据,接收线程不希望接收任何数据。接收线程不需要执行任何进一步的操作,因为邮箱已完成数据检索并删除了邮件。
- 如果消息描述符大小不为零,并且接收线程仍希望检索数据,则该线程必须调用
k_mbox_data_get()
并提供足够大的消息缓冲区来保存数据。邮箱将数据复制到邮件缓冲区并删除邮件。 - 如果消息描述符大小不为零,并且接收线程不想检索数据,则该线程必须调用
k_mbox_data_get()
,并将消息缓冲区指定为NULL 。邮箱将删除邮件而不复制数据。
延迟数据检索技术适用于不希望立即检索消息数据的应用程序。例如,当内存比较吃紧时,这是非常有用的。
下面的代码使用邮箱的延迟数据检索机制
,仅当邮件满足特定条件时,才从接收线程获取邮件数据,从而消除不需要的数据复制。发送线程提供的邮件info
字段用于对邮件进行分类。
void consumer_thread(void)
{
struct k_mbox_msg recv_msg;
char buffer[10000];
while (1) {
/* prepare to receive message */
recv_msg.size = 10000;
recv_msg.rx_source_thread = K_ANY;
/* get message, but not its data */
k_mbox_get(&my_mailbox, &recv_msg, NULL, K_FOREVER);
/* get message data for only certain types of messages */
if (is_message_type_ok(recv_msg.info)) {
/* retrieve message data and delete the message */
k_mbox_data_get(&recv_msg, buffer);
/* process data in "buffer" */
...
} else {
/* ignore message data and delete the message */
k_mbox_data_get(&recv_msg, NULL);
}
}
}
使用消息块检索数据
接收线程可以选择将消息数据检索到内存块中,而不是消息缓冲区中。这与将数据检索到消息缓冲区中的方式大致相同。接收线程首先接收没有其数据的消息,然后通过调用k_mbox_data_block_get()
来检索数据。邮箱填充接收线程提供的块描述符,允许线程访问数据。邮箱还会删除收到的邮件,因为数据检索已完成。然后,接收线程负责在不再需要数据时将消息块释放回内存池。
此技术最适合于使用内存块发送消息数据的应用程序。
下面的代码使用邮箱来接收使用内存块发送的邮件,从而消除了在处理大型邮件时不需要的数据复制。消息可以同步发送,也可以异步发送。
/* define a memory pool containing 1 block of 10000 bytes */
K_MEM_POOL_DEFINE(my_pool, 10000, 10000, 1, 4);
void consumer_thread(void)
{
struct k_mbox_msg recv_msg;
struct k_mem_block recv_block;
int total;
char *data_ptr;
int i;
while (1) {
/* prepare to receive message */
recv_msg.size = 10000;
recv_msg.rx_source_thread = K_ANY;
/* get message, but not its data */
k_mbox_get(&my_mailbox, &recv_msg, NULL, K_FOREVER);
/* get message data as a memory block and discard message */
k_mbox_data_block_get(&recv_msg, &my_pool, &recv_block, K_FOREVER);
/* compute sum of all message bytes in memory block */
total = 0;
data_ptr = (char *)(recv_block.data);
for (i = 0; i < recv_msg.size; i++) {
total += data_ptr++;
}
/* release memory block containing data */
k_mem_pool_free(&recv_block);
}
}
邮箱的Kconfig
Kconfig | 描述 |
---|---|
CONFIG_NUM_MBOX_ASYNC_MSGS | 异步邮箱中邮件的最大数量 |