管道是一个内核对象,它允许一个线程字节流发送到另一个线程。管道可用于同步传输全部或部分数据块。

概念

管道可以配置一个环形缓冲区,用于保存已发送但尚未接收的数据。或者管道也可能没有环形缓冲区
可以定义任意数量的管道(仅受可用 RAM 的限制)。每个管道都由其内存地址引用。
管道具有以下键属性:

  • 指示管道的环形缓冲区的大小的大小。请注意,零代表没有环形缓冲区的管道。

管道必须先初始化,然后才能使用。管道最初是空的。
数据由线程全部或部分同步发送到管道。如果无法立即满足指定的最小字节数,则操作将立即失败或者尝试发送尽可能多的字节,然后挂起,期望稍后可以完成发送。接受的数据要么复制到管道的环形缓冲区,要么直接复制到等待的读取器。
数据由线程从管道同步接收。如果不能立即满足指定的最小字节数,则操作将立即失败或者尝试接收尽可能多的字节,然后挂起,期望稍后可以完成接收。接受的数据要么从管道的环形缓冲区复制,要么直接从等待的发送方复制。
数据也可以通过线程从管道中刷新。刷新可以在整个管道上执行,也可以仅在其环形缓冲区上执行。

  • 刷新管道等效于将管道缓冲区中的所有数据和等待进入该管道的所有数据读取到大型临时缓冲区中并丢弃缓冲区。
  • 刷新环形缓冲区等效于仅将环形缓冲区中的数据读取到临时缓冲区中,然后丢弃该临时缓冲区。

刷新环形缓冲区并不能保证环形缓冲区将保持为空;刷新它可能允许挂起的写入器填充环形缓冲区。

刷新实际上不会分配或使用其他缓冲区。

定义管道

管道是使用k_pipe类型定义变量,并且使用unsigned char类型定义可选字符缓冲区。然后必须通过调用k_pipe_init()对其进行初始化。
下面的代码定义并初始化一个空管道,该管道具有能够容纳 100 个字节并对齐到 4 字节边界的环形缓冲区。

  1. unsigned char __aligned(4) my_ring_buffer[100];
  2. struct k_pipe my_pipe;
  3. k_pipe_init(&my_pipe, my_ring_buffer, sizeof(my_ring_buffer));

也可以通过调用K_PIPE_DEFINE在编译时定义和初始化管道。

  1. K_PIPE_DEFINE(my_pipe, 100, 4);

写入管道

通过调用k_pipe_put()将数据添加到管道中。
管道将数据从生产线程传递到一个或多个使用线程。如果管道的环形缓冲区由于使用者跟不上而填满,则生产线程将等待指定的时间。

  1. struct message_header {
  2. ...
  3. };
  4. void producer_thread(void)
  5. {
  6. unsigned char *data;
  7. size_t total_size;
  8. size_t bytes_written;
  9. int rc;
  10. ...
  11. while (1) {
  12. /* Craft message to send in the pipe */
  13. data = ...;
  14. total_size = ...;
  15. /* send data to the consumers */
  16. rc = k_pipe_put(&my_pipe, data, total_size, &bytes_written,
  17. sizeof(struct message_header), K_NO_WAIT);
  18. if (rc < 0) {
  19. /* Incomplete message header sent */
  20. ...
  21. } else if (bytes_written < total_size) {
  22. /* Some of the data was sent */
  23. ...
  24. } else {
  25. /* All data sent */
  26. ...
  27. }
  28. }
  29. }

读取管道

通过调用k_pipe_get()从管道中读取数据。

  1. void consumer_thread(void)
  2. {
  3. unsigned char buffer[120];
  4. size_t bytes_read;
  5. struct message_header *header = (struct message_header *)buffer;
  6. while (1) {
  7. rc = k_pipe_get(&my_pipe, buffer, sizeof(buffer), &bytes_read,
  8. sizeof(header), K_MSEC(100));
  9. if ((rc < 0) || (bytes_read < sizeof (header))) {
  10. /* Incomplete message header received */
  11. ...
  12. } else if (header->num_data_bytes + sizeof(header) > bytes_read) {
  13. /* Only some data was received */
  14. ...
  15. } else {
  16. /* All data was received */
  17. ...
  18. }
  19. }
  20. }

如果需要,管道可用于传输长数据流。但是,通常最好将指针发送到大型数据项以避免复制数据。

刷新管道的缓冲区

通过调用k_pipe_buffer_flush()从管道的环形缓冲区刷新数据。

  1. void monitor_thread(void)
  2. {
  3. while (1) {
  4. ...
  5. /* Pipe buffer contains stale data. Flush it. */
  6. k_pipe_buffer_flush(&my_pipe);
  7. ...
  8. }
  9. }

刷新管道

管道中的所有数据都通过调用k_pipe_flush()进行刷新。

  1. void monitor_thread(void)
  2. {
  3. while (1) {
  4. ...
  5. /* Critical error detected. Flush the entire pipe to reset it. */
  6. k_pipe_flush(&my_pipe);
  7. ...
  8. }
  9. }