前言

由于在接下来的学习内容中涉及到对信息、数据的同步,我们在开发操作系统的时候必须考虑到并发、多核情况下的同步问题,要想解决这些问题,实际应用中一般采用锁:原子变量、关中断、信号量、自旋锁。

那么其底层如何实现的呢?

——让我们跟着彭老师的讲解,一起学习!

以对一个全局变量进行操作为例,下述代码是一个中断处理函数和逻辑函数,它们都是对全局变量a进行操作

  1. int a = 0;
  2. void interrupt_handle()
  3. {
  4. a++;
  5. }
  6. void thread_func()
  7. {
  8. a++;
  9. }

对于a++,在编译阶段一般是将其分为三步:

  • 将a存入某寄存器
  • 该寄存器值+1
  • 将寄存器的值刷新回内存

但是,如果在第二步完成之后突然发生中断了该如何?由时刻为标尺对a的值进行观测:

6.同步(锁、信号量)底层原理 - 图1

可以看到,由于中断的发生将thread_func的执行过程强行掐断,最终a的值为1,但其实它的值应该为2

那么如何解决这个问题?

一是,将a++这个操作变成不可分割,即无法再拆解成3步执行,即是原子操作

二是,在执行a++这个操作时,关闭中断,执行完毕之后再打开中断

原子操作

原子操作需要底层操作系统支持,X86 CPU支持许多原子指令C语言正是通过嵌入汇编代码调用这些原子指令来实现原子操作,而Java是在JVM层面对原子操作进行了实现。

另外,内嵌汇编代码编写格式的学习可参考:AT&T格式汇编代码,也可参考文末的补充部分进行理解。

实现原子读和原子写等:

  1. //定义一个原子类型
  2. typedef struct s_ATOMIC{
  3. volatile s32_t a_count; //禁止编译器优化,使其每次都从内存中加载变量
  4. }atomic_t;
  5. //原子读
  6. static inline s32_t atomic_read(const atomic_t *v)
  7. {
  8. //x86平台取地址处是原子
  9. return (*(volatile u32_t*)&(v)->a_count);
  10. }
  11. //原子写
  12. static inline void atomic_write(atomic_t *v, int i)
  13. {
  14. //x86平台把一个值写入一个地址处也是原子的
  15. v->a_count = i;
  16. }
  17. //原子加上一个整数
  18. static inline void atomic_add(int i, atomic_t *v)
  19. {
  20. __asm__ __volatile__("lock;" "addl %1,%0"
  21. : "+m" (v->a_count)
  22. : "ir" (i));
  23. }
  24. //原子减去一个整数
  25. static inline void atomic_sub(int i, atomic_t *v)
  26. {
  27. __asm__ __volatile__("lock;" "subl %1,%0"
  28. : "+m" (v->a_count)
  29. : "ir" (i));
  30. }
  31. //原子加1
  32. static inline void atomic_inc(atomic_t *v)
  33. {
  34. __asm__ __volatile__("lock;" "incl %0"
  35. : "+m" (v->a_count));
  36. }
  37. //原子减1
  38. static inline void atomic_dec(atomic_t *v)
  39. {
  40. __asm__ __volatile__("lock;" "decl %0"
  41. : "+m" (v->a_count));
  42. }

注意,“lock”前缀是多核CPU下保证数据同步的指令,该指令会锁住数据总线,防止其他CPU更改对应内存地址的值,单核CPU不需要,更详细内容查看内存屏障、validate

在使用原子操作的情况下,原本对全局变量的操作就可以改成:

  1. atomic_t a = {0};
  2. void interrupt_handle()
  3. {
  4. atomic_inc(&a);
  5. }
  6. void thread_func()
  7. {
  8. atomic_inc(&a);
  9. }

关中断

上述原子操作虽然可以完成同步操作,但是只能对付一些简单的单体变量,对于复杂的数据结构,如果使用原子操作,可想而知代码的复杂程度有多大。

在这个时候可以考虑通过关闭中断,从而实现相应的代码控制

x86平台上的CPU关闭中断、开启中断指令是cli、sti,其主要是对CPU的eflags寄存的IF位进行设置,CPU据此来决定是否响应中断信号

简单的关、开中断代码:

  1. //关闭中断
  2. void hal_cli()
  3. {
  4. __asm__ __volatile__("cli": : :"memory");
  5. }
  6. //开启中断
  7. void hal_sti()
  8. {
  9. __asm__ __volatile__("sti": : :"memory");
  10. }
  11. //使用场景
  12. void foo()
  13. {
  14. hal_cli();
  15. //操作数据……
  16. hal_sti();
  17. }
  18. void bar()
  19. {
  20. hal_cli();
  21. //操作数据……
  22. hal_sti();
  23. }

上述方式的问题就在于无法嵌套使用:

  1. void foo()
  2. {
  3. hal_cli();
  4. //操作数据第一步……
  5. hal_sti();
  6. }
  7. void bar()
  8. {
  9. hal_cli();
  10. foo();
  11. //操作数据第二步……
  12. hal_sti();
  13. }

当从foo()函数返回到bar()时,这时中断已经被打开,但是bar()不知道!如果继续执行操作数据第二步,那么就可能因为其他线程函数的访问造成数据不一致问题。

为了让中断管理可以嵌套,就需要在关闭、开启中断的时候保存之前的状态,如下:

  1. typedef u32_t cpuflg_t;
  2. static inline void hal_save_flags_cli(cpuflg_t* flags)
  3. {
  4. __asm__ __volatile__(
  5. "pushfl \t\n" //把eflags寄存器压入当前栈顶
  6. "cli \t\n" //关闭中断
  7. "popl %0 \t\n"//把当前栈顶弹出到flags为地址的内存中
  8. : "=m"(*flags)
  9. :
  10. : "memory"
  11. );
  12. }
  13. static inline void hal_restore_flags_sti(cpuflg_t* flags)
  14. {
  15. __asm__ __volatile__(
  16. "pushl %0 \t\n"//把flags为地址处的值寄存器压入当前栈顶
  17. "popfl \t\n" //把当前栈顶弹出到flags寄存器中
  18. :
  19. : "m"(*flags)
  20. : "memory"
  21. );
  22. }

简单来说,在关闭中断的时候,把之前的状态存入地址为flag的内存中;在开启中断时,究竟是否开启中断,是由flag地址中存储的值来确定的!(即进行关闭中断操作之前的中断状态)

注:内存中的flag只是用来保存的,CPU是否中断由寄存器中的值而定

注:注意区分中断与子程序调用的区别

注:这里没有区分中断的优先级,但是实际的操作系统中,低级中断应该被高级中断所打断

自旋锁

中断完美解决了原子操作只能针对单体变量的情况,但是——中断只能控制单核CPU,在多核CPU的情况下,又会遇到并发冲突的问题了,这个时候就需要使用“自旋锁”。

自旋锁原理如下:

6.同步(锁、信号量)底层原理 - 图2

上述流程有一个必须保证的点:读取锁变量、判断加锁操作必须是原子操作,否则还是会造成并发错误

好在x86 提供了一个原子交换指令:xchg——让寄存器的值和内存空间的值进行交换

根据上述流程图,将自旋锁实现如下:

  1. //自旋锁结构
  2. typedef struct
  3. {
  4. //volatile可以防止编译器优化
  5. //保证其它代码始终从内存加载lock变量的值
  6. volatile u32_t lock;
  7. } spinlock_t;
  8. //锁初始化函数
  9. static inline void x86_spin_lock_init(spinlock_t * lock)
  10. {
  11. lock->lock = 0;//锁值初始化为0是未加锁状态
  12. }
  13. //加锁函数
  14. static inline void x86_spin_lock(spinlock_t * lock)
  15. {
  16. __asm__ __volatile__ (
  17. "1: \n"
  18. "lock; xchg %0, %1 \n"//把值为1的寄存器和lock内存中的值进行交换
  19. "cmpl $0, %0 \n" //用0和交换回来的值进行比较
  20. "jnz 2f \n" //不等于0则跳转后面2标号处运行
  21. "jmp 3f \n" //若等于0则跳转后面3标号处返回
  22. "2: \n"
  23. "cmpl $0, %1 \n"//用0和lock内存中的值进行比较
  24. "jne 2b \n"//若不等于0则跳转到前面2标号处运行继续比较
  25. "jmp 1b \n"//若等于0则跳转到前面1标号处运行,交换并加锁
  26. "3: \n" :
  27. : "r"(1), "m"(*lock));
  28. }
  29. //解锁函数
  30. static inline void x86_spin_unlock(spinlock_t * lock)
  31. {
  32. __asm__ __volatile__(
  33. "movl $0, %0\n"//解锁把lock内存中的值设为0就行
  34. :
  35. : "m"(*lock));
  36. }

其中加锁函数的逻辑部分要好好理解,它通过转移指令形成了一个循环判断的逻辑,直到加锁才会退出。

注意,代码中汇编部分: : “r”(1), “m”(*lock)——系统分配一个寄存器,填入1;取内存地址为lock的值;而后xchg %0,%1即是将两者的值进行交换。

遗憾的是上述代码存在中断嵌套的问题:“如果一个CPU获取自旋锁后发生中断,中断代码里也尝试获取自旋锁,那么自旋锁永远不会被释放,发生死锁。

关于自旋锁与中断的详细解释可以参考:Linux内核死锁,其中关于自旋锁与中断嵌套导致的一个经典场景叙述如下

“考虑下面的场景(中断上下文场景):

  • 运行在CPU0上的进程A在某个系统调用过程中访问了共享资源 R
  • 运行在CPU1上的进程B在某个系统调用过程中也访问了共享资源 R
  • 外设P的中断handler中也会访问共享资源 R

在这样的场景下,使用spin lock可以保护访问共享资源R的临界区吗?

我们假设CPU0上的进程A持有spin lock进入临界区,这时候,外设P发生了中断事件,并且调度到了CPU1上执行,看起来没有什么问题,执行在CPU1上的handler会稍微等待一会CPU0上的进程A,等它离开临界区就会释放spin lock的,但是,如果外设P的中断事件被调度到了同一个CPU0上执行会怎么样?

CPU0上的进程A在持有spin lock的状态下被中断上下文抢占,而抢占它的CPU0上的handler在进入临界区之前仍然会试图获取spin lock,悲剧发生了,CPU0上的P外设的中断handler永远的进入spin状态,这时候,CPU1上的进程B也不可避免在试图持有spin lock的时候失败而导致进入spin状态

为了解决这样的问题,linux kernel采用了这样的办法:如果涉及到中断上下文的访问,spin lock需要和禁止本 CPU 上的中断联合使用。

关于这一点的解决方式可以参考关中断,详细讲解请看:08 锁提示一下,获取自旋锁的时候,干脆把中断关闭了就好,这样就不会导致中断嵌套

经修改后,可以实现关中断下获取自旋锁,以及恢复中断状态释放自旋锁

  1. static inline void x86_spin_lock_disable_irq(spinlock_t * lock
  2. ,cpuflg_t* flags)
  3. {
  4. __asm__ __volatile__(
  5. "pushfq \n\t"
  6. "cli \n\t"
  7. "popq %0 \n\t"
  8. "1: \n\t"
  9. "lock; xchg %1, %2 \n\t"
  10. "cmpl $0,%1 \n\t"
  11. "jnz 2f \n\t"
  12. "jmp 3f \n"
  13. "2: \n\t"
  14. "cmpl $0,%2 \n\t"
  15. "jne 2b \n\t"
  16. "jmp 1b \n\t"
  17. "3: \n"
  18. :"=m"(*flags)
  19. : "r"(1), "m"(*lock));
  20. }
  21. static inline void x86_spin_unlock_enabled_irq(spinlock_t* lock
  22. ,cpuflg_t* flags)
  23. {
  24. __asm__ __volatile__(
  25. "movl $0, %0\n\t"
  26. "pushq %1 \n\t"
  27. "popfq \n\t"
  28. :
  29. : "m"(*lock), "m"(*flags));
  30. }

代码中的cpuflg表示当前的中断状态。

信号量

以上三种解决同步的方式都不适合长等待,利用自旋锁这种方式去获取需要一定时间准备的资源,并且会造成CPU的时间消耗。

试想,能不能有一种机制,当资源准备好了之后,提醒CPU去获取呢

还真有,那就是在1965年由荷兰学者Edsger Dijkstra(没错,就是提出那个算法的男人)提出的信号量机制

信号量机制由三个环节组成

  • 等待:程序等待资源准备好
  • 互斥:同时只有一个程序可以访问资源
  • 唤醒:资源准备好之后唤醒固定程序

由于需要等待、互斥等操作,拟定一个数据结构如下:

  1. //等待链数据结构,用于挂载等待代码执行流(线程)的结构
  2. //里面有用于挂载代码执行流的链表和计数器变量,后续会讲
  3. typedef struct s_KWLST
  4. {
  5. spinlock_t wl_lock;//等待链表的锁
  6. uint_t wl_tdnr;//计数器
  7. list_h_t wl_list;//等待进程的链表
  8. }kwlst_t;
  9. //信号量数据结构
  10. typedef struct s_SEM
  11. {
  12. spinlock_t sem_lock;//维护sem_t自身数据的自旋锁
  13. uint_t sem_flg;//信号量相关的标志
  14. sint_t sem_count;//信号量计数值
  15. kwlst_t sem_waitlst;//用于挂载等待代码执行流(线程)结构
  16. }sem_t;

想想信号量一般是怎么使用的呢?

  1. 获取信号量

将信号量自身加锁如果信号值sem_count小于0,则将当前进程放入等待链;否则,对信号量执行“减一”,获取信号量成功,进入代码执行流程;

2. 代码执行

成功获取信号量之后,程序进行自己相应的操作逻辑。

3. 释放信号量

将信号量自身加锁,对信号值sem_count执行“加一”,如果大于0,则从等待链中唤醒一个进程;无论是否大于0,最终即完成信号量的释放

从上述流程可以看出,信号量其实就是一个“多人使用,用完放回”的场景。

根据以上分析一个简单的信号量实现如下:

  1. //获取信号量
  2. void krlsem_down(sem_t* sem)
  3. {
  4. cpuflg_t cpufg;
  5. start_step:
  6. //之前自旋锁的封装
  7. krlspinlock_cli(&sem->sem_lock,&cpufg);
  8. if(sem->sem_count<1)
  9. {//如果信号量值小于1,则让代码执行流(线程)睡眠
  10. krlwlst_wait(&sem->sem_waitlst);
  11. //之前自旋锁的封装
  12. krlspinunlock_sti(&sem->sem_lock,&cpufg);
  13. //切换代码执行流,下次恢复执行时依然从下一行开始执行
  14. //所以要goto开始处重新获取信号量进行判断
  15. krlschedul();
  16. goto start_step;
  17. }
  18. sem->sem_count--;//信号量值减1,表示成功获取信号量
  19. //之前自旋锁的封装
  20. krlspinunlock_sti(&sem->sem_lock,&cpufg);
  21. return;
  22. }
  23. //释放信号量
  24. void krlsem_up(sem_t* sem)
  25. {
  26. cpuflg_t cpufg;
  27. //之前自旋锁的封装
  28. krlspinlock_cli(&sem->sem_lock,&cpufg);
  29. sem->sem_count++;//释放信号量
  30. if(sem->sem_count<1)
  31. {//如果小于1,则说数据结构出错了,挂起系统
  32. krlspinunlock_sti(&sem->sem_lock,&cpufg);
  33. hal_sysdie("sem up err");
  34. }
  35. //唤醒该信号量上所有等待的代码执行流(线程)
  36. krlwlst_allup(&sem->sem_waitlst);
  37. krlspinunlock_sti(&sem->sem_lock,&cpufg);
  38. krlsched_set_schedflgs();
  39. return;
  40. }

其中krlschedul、krlwlst_wait、krlwlst_allup、krlsched_set_schedflgs是负责进程调度的相关函数,会在之后的进程章节进行讲解,敬请期待!

拓展:Linux的同步机制

Linux上实现数据同步主要有五个工具:原子变量、中断控制、自旋锁、信号量、读写锁。重点如下:

6.同步(锁、信号量)底层原理 - 图3

有关这部分的实现代码及讲解可以参考:09. Linux锁机制。

补充:GCC内嵌汇编代码

在 C 代码中嵌入汇编语句要比”纯粹”的汇编语言代码复杂得多,因为需要解决如何分配寄存器,以及如何与C代码中的变量相结合等问题。

通常嵌入到 C 代码中的汇编语句很难做到与其它部分没有任何关系,因此很多时候需要用到完整的内联汇编格式

  1. __asm__("asm statements" : outputs : inputs : registers-modified);

内嵌汇编格式分为四个部分,其中每个部分用“:”符号来隔断,每个部分中的各个操作用“;”来区分。

  • asm statements:汇编代码部分,即指令部分,操作寄存器需要带上%,操作立即数需要带上$;
  • output:输出列表部分,包括寄存器、内存,=号来连接;
  • Input:输入列表部分,包括寄存器、内存、立即数等;
  • register-modified:寄存器修改列表部分,这些寄存器在汇编代码中使用了,但是不属于输入和输出列表,因此对寄存器进行说明,以便GCC能够采用相应的措施,比如不用它们来保存其他的数据。

我们以下面的代码为例:

  1. int main()
  2. {
  3. int a = 10, b = 0;
  4. __asm__ __volatile__(
  5. ;这两行都是指令部分,%1、%0分别表示操作第二个、第一个“样板”操作数
  6. "movl %1, %%eax;\\n\\r"
  7. "movl %%eax, %0;"
  8. ;从输出部分开始计数,这就是第一个“样板”操作数:b
  9. ;不过标号为0
  10. :"=r"(b)
  11. ;这是输入部分,第二个样板操作数:a,标号为1
  12. :"r"(a)
  13. ;这是修改寄存器部分,由于用到了eax,告诉gcc在执行过程中该寄存器不能
  14. ;保存其他值
  15. :"%eax");
  16. printf("Result: %d, %d\\n", a, b);
  17. }

上述代码完成操作:将a的值赋值给b。

参考链接

08 | 锁:并发操作中,解决数据同步的四种方法

AT&T格式汇编

信号量和条件变量的区别