简介
参考资料
Synchronization primitives in the Linux kernel.
如何理解互斥锁、条件锁、读写锁以及自旋锁?
Let’s Talk Locks!
Linux内核RCU(Read Copy Update)锁简析
专有名词
并发执行
并发执行是产生的四种情况。
1 - 中断
中断会异步的打断任何正在执行的任务
2 - 内核的抢占
3 - 睡眠
4 - 对称多处理器
竞争
出现下面两种情况的时候,有可能会产生竞争。
- 第一个是有两个可执行上下文并发执行的时候,或者是其中一个上下文可以随意抢占另外一个,比如说中断抢占系统调用。
- 第二种情况是可执行上下文对共享变量执行“读写”的访问。
这些竞争条件会导致各种难以调试的错误,比如说释放资源,一般来说资源只需要释放一次。若是在释放资源的时候,另外一个进程抢占了内核,再释放一次这个资源,就导致了错误。
临界区
前面我们讲述了竞争条件会导致各种错误,为了解决这个问题,就提出了临界区。
临界区就是访问和操作共享数据的代码段。一般是让临界区被原子执行,这就是所谓的原子操作,换句话说,临界区执行的时候不可被打断。那么我们怎么保护临界区不被打断了。
- 是临界区原子的执行。
- 进入临界区以后,禁止抢占。
- 比如禁止中断,禁止下半部分处理,禁止线程抢占等等。
- 串行的访问临界区
内核中的原子类型
定义
include/linux/types.h
typedef struct {
int counter;
} atomic_t;
- 为什么内核中的原子类型要将一个整形放入一个结构体里面了?
API
以arm架构为例,arch/arm/include/asm/atomic.h
atomic_read(atomic_t * v);
atomic_set(atomic_t * v, int i);
void atomic_add(int i, atomic_t *v);
atomic_sub(int i, atomic_t *v);
int atomic_sub_and_test(int i, atomic_t *v);
int atomic_sub_and_test(int i, atomic_t *v);
void atomic_inc(atomic_t *v);
void atomic_dec(atomic_t *v);
int atomic_dec_and_test(atomic_t *v);
int atomic_inc_and_test(atomic_t *v);
int atomic_add_negative(int i, atomic_t *v);
int atomic_add_return(int i, atomic_t *v);
int atomic_sub_return(int i, atomic_t *v);
int atomic_inc_return(atomic_t * v);
int atomic_dec_return(atomic_t * v);
void atomic_inc(atomic_t *v);
void atomic_dec(atomic_t *v);
int atomic_dec_and_test(atomic_t *v);
int atomic_inc_and_test(atomic_t *v);
int atomic_add_negative(int i, atomic_t *v);
int atomic_add_return(int i, atomic_t *v);
int atomic_sub_return(int i, atomic_t *v);
int atomic_inc_return(atomic_t * v);
int atomic_dec_return(atomic_t * v);
共享队列与加锁
访问共享队列的时候,首先需要加锁,以防止其他进程的并发访问。
local_irq_disable() // 禁止中断
// 临界区
local_irq_enable() //开启中断
这个操作在单核CPU上比较实用,但是在多核CPU上捉襟见肘了。不能处理SMP引发的竞争(并行)。并且中断禁止以后,会导致其他所有的数据不能处理,会带来很不好的结果,比如说数据丢失之类的。
自旋锁
中断屏蔽的缺点让自旋锁产生了,其主要目的是为了解决SMP的并行执行而引入的。自旋锁同一时刻只能被一个内核拥有,并且不允许睡眠,不应该长时间拥有。推荐资料
定义
typedef struct spinlock {
union {
struct raw_spinlock rlock;
#ifdef CONFIG_DEBUG_LOCK_ALLOC
# define LOCK_PADSIZE (offsetof(struct raw_spinlock, dep_map))
struct {
u8 __padding[LOCK_PADSIZE];
struct lockdep_map dep_map;
};
#endif
};
} spinlock_t;
使用
static DEFINE_SPINLOCK(iort_msi_chip_lock);/* lock for queue */
spin_lock(&iort_msi_chip_lock);
/*临界区*/
spin_unlock(&iort_msi_chip_lock); /*解锁*/
信号量
定义
# include/linux/semaphore.h
/* Please don't access any members of this structure directly */
struct semaphore {
raw_spinlock_t lock; # 使用的自选锁,防止并发访问出错
unsigned int count; # 使用计数
struct list_head wait_list; # 等待资源的进程队列
};
对信号量的操作
对信号量的操作可以理解为 操作系统里面的VP操作。
void down(struct semaphore *sem); # P操作
# 将count减1
void up(struct semaphore *sem); # v操作
# 将count加1
信号量与自旋锁比较
需求 | 建议加锁方法 |
---|---|
低开销加锁 | 推荐自旋锁 |
短期加锁 | 推荐自旋锁 |
长期加锁 | 推荐自信号量 |
中断上下文加锁 | 使用自旋锁 |
持有锁时需要睡眠,调度 | 使用信号量 |
内核其他的同步措施
- 互斥锁
- 完成变量
- RCU机制
互斥锁
互斥锁和信号量为1的时候的情况有些类似。可以允许随眠完成变量
完成变量基于等待队列机制实现的。当内核中一个任务需要发出一个信号通知另外一个任务发生了特定的事情,就可以使用完成变量。RCU机制(写时复制)
读-拷贝修改锁机制,允许多个读者同时访问被保护的数据,又允许多个读者和多个写者同时访问被保护的数据。
实践
接下来,我们实践内核的多任务并发实例。主要使用的到工具包含
- 信号量
- 自旋锁
- 工作队列
- 内核定时器
实现的大致步骤
首先在内核中建立一个共享链表,使用自旋锁结构对其并发保护。
然后利用工作队列机制建立若干个内核线程,每一个内核线程都有权限对链表插入,删除操作。
其次创建内核定时器,编写回调函数,使其在内核到期时,删除共享链表中的节点。
最后卸载模块,销毁链表,释放资源。
创建锁以及信号量
/*定义信号量*/
static DEFINE_SEMAPHORE(sem); /*内核线程启动之间的通讯*/
/*自旋锁*/
static DEFINE_SPINLOCK(my_lock); /*链表操作的保护*/
创建内核线程
使用工作队列来创建内核线程
/*工作列表*/
static struct work_struct queue;
/*线程*/
static int shareList(void *data)
spin_lock(&my_lock); /*加锁*/
... 临界区 操作链表
负责创建新节点,将节点插入链表
以及删除节点,将节点从链表中删除
spin_unlock(&my_lock); /*解锁*/
/*工作队列的回调函数*/
static void kthread_launcher(struct work_struct *wq)
kthread_run(shareList,NULL,"%d",count);
/*资源加1*/
up(&sem);
/*调用keventd 运行内核线程*/
static void start_kthread(void)
/*为了防止SMP产生并行竞争,使用信号量*/
down(&sem);
/*运行工作队列*/
schedule_work(&queue);
init:
INIT_WORK(&queue,kthread_launcher);
/*调用工作队列来创建线程*/
for i in range(200):
start_kthread()
内核定时器
#include <linux/timer.h>
/*内核定时器*/
static struct timer_list mytimer;
/*定时器回调函数*/
static void qt_task(unsigned long data)
struct timer_list *timer = (struct timer_list *)data;
spin_lock(&my_lock); /*加锁*/
... 临界区 操作链表
spin_unlock(&my_lock); /*解锁*/
mod_timer(timer,jiffies+msecs_to_jiffies(1000));
init:
/*将 mytimer 本身作为参数,传给回调函数*/
setup_timer(&mytimer,qt_task,(unsigned long)&mytimer);
add_timer(&mytimer);
退出
_exit:
del_timer(&mytimer);
spin_lock(&my_lock); /*加锁*/
... 临界区 操作链表
spin_unlock(&my_lock); /*解锁*/