5.1 内存模型基础

5.1.2 对象、内存区域和并发

假设两个线程访问同一内存区域,却没有强制它们服从一定的访问次序,如果其中至少有一个是非原子化访问并且至少有一个是写操作,就会出现数据竞争,导致未定义行为。

5.2 C++中的原子操作及其类别

原子操作是不可分割的操作。在系统的任一线程内,都不会观察到原子操作出于半完成状态,要么完全做好、要么完全没做

5.2.1 标准原子类型

原子操作的关键用途是取代需要互斥的同步方式

只有一个原子类型不提供is_lock_free()成员函数:**std::atomic_flag**,是一个简单的布尔标志,所以必须采用无锁操作。只要利用这种无锁布尔标志,就可以实现一个简易的锁,进而基于该锁实现其他原子类型
std::atomic_flag的对象在初始化时清零,随后即可通过成员函数test_and_set()查值并设置成立,或者由clear()清零。

image.png

由于不具备拷贝构造和拷贝赋值,所以标准的原子类型对象无法复制和赋值

类模板std::atomic<>并不局限于特化类型,也具有泛化模板可依据用户自定义类型创建原子类型的变体。仅具有以下操作:

  • load():接受用户自定义类型的赋值。
  • store():转化为用户自定义类型、
  • exchange()compare_exchange_weak()compare_exchange_strong()

对原子类型上的每一种操作都可以提供额外的参数,从枚举类型std::memory_order取值,用于设定所需的内存次序语意
image.png

5.2.2 操作 std::atomic_flag

std::atomic_flag表示一个布尔标志,只有两种状态:成立置零。唯一用途是充当构建单元,一般不会直接使用它。
std::atomic_flag类型的对象必须由宏ATOMIC_FLAG_INIT初始化,它将标志初始化为置零状态

  1. std::atomid_flag = ATOMIC_FLAG_INIT;

所有原子类型中,只有std::atomic_flag必须采用这种特殊初始化处理,也是唯一保证无锁的原子类型
完成std::atomic_flag的初始化后,只能执行 3 种操作:

  • 销毁(析构函数);
  • 置零(成员函数clear()
  • 读取原有的值并设置标志成立(成员函数test_and_set())。

image.png

正因为std::atomic_flag功能有限,所以可以完美扩展成自旋锁互斥。最开始令原子标志置零表示互斥没有上锁反复调用**test_and_set()**尝试锁住互斥,一旦读取的值变成false说明线程已将标志设置成立(新值为true)则循环终止。将标志置零即可解锁互斥

  1. //采用 std::atomic_flag 实现自旋锁互斥
  2. #include <atomic>
  3. #include <iostream>
  4. #include <mutex>
  5. #include <thread>
  6. #include <vector>
  7. using namespace std;
  8. class spinlock_mutex {
  9. std::atomic_flag flag;
  10. public:
  11. spinlock_mutex() : flag(ATOMIC_FLAG_INIT) {}
  12. void lock() {
  13. while (flag.test_and_set(std::memory_order_acquire))
  14. ;
  15. }
  16. void unlock() {
  17. flag.clear();
  18. }
  19. };
  20. int k = 0;
  21. auto main() -> int {
  22. spinlock_mutex m;
  23. std::vector<std::thread> vec{};
  24. for (int i = 0; i < 20; ++i)
  25. vec.emplace_back([&] {
  26. std::lock_guard lk(m);
  27. ++k;
  28. });
  29. for (auto& p : vec)
  30. p.join();
  31. std::cout << k << endl;
  32. }

5.2.3 操作 std::atomic

std::atomic<bool>无法拷贝构造或拷贝赋值,但是还是可以根据非原子布尔量创建其对象:image.png
HINT:按照C++惯例,赋值操作符通常返回一个引用,指向接受赋值的目标对象(登号左侧的对象)。但是std::atomic<bool>赋值不返回引用直接返回赋予的布尔值
image.png
假设返回的是指向原子变量的引用,若有代码依赖赋值操作的结果那它必须随之显式地加载该结果的值,而此时另一线程可能在返回和加载间改动其值。所以按值返回可以避开多余的加载动作,从而保证获取的值正确。

std::atomic<bool>写操作通过调用**store()**,也可以设定内存次序语义。此外std::atomic<bool>提供了更通用的成员函数**exchange()**以代替**test_and_set()**,它获取原有的值,还可以自行选定新值作为替换
std::atomic<bool>还支持单纯的读取(没有伴随的修改行为):隐式做法是将实例转换为普通布尔值显式做法是调用**load()**

  1. #include <atomic>
  2. #include <iostream>
  3. using namespace std;
  4. auto main() -> int {
  5. std::atomic_bool b;
  6. bool x = b.load();
  7. std::cout << std::boolalpha << "x = " << x << std::endl;
  8. b.store(true);
  9. x = b.exchange(false, std::memory_order_acq_rel);
  10. std::cout << "x = " << x << std::endl;
  11. }
  12. //result
  13. flase
  14. true

5.2.4 操作 std::atomic:算数形式的指针运算

atomic<bool>大致相同。

std::atomic<T*>提供的新操作是算数形式的指针运算。成员函数fetch_add()fetch_sub()给出最基本的操作,分别就对象中存储的地址进行原子化加减,但是返回的是运算之前的地址

  1. #include <atomic>
  2. #include <cassert>
  3. #include <iostream>
  4. using namespace std;
  5. class Foo {};
  6. auto main() -> int {
  7. Foo some_array[5];
  8. std::atomic<Foo*> p(some_array);
  9. Foo* x = p.fetch_add(2);
  10. assert(x == some_array);
  11. assert(p.load() == &some_array[2]);
  12. x = (p -= 1);
  13. assert(x == &some_array[1]);
  14. assert(p.load() == &some_array[1]);
  15. }

5.2.6 泛化的 std::atomic<>类模板

可以根据自定义类型创建其他原子类型,所提供的接口与**std::atomic<bool>**相同

不能随意套用任何自定义类型,对于某个自定义类型UDT,要满足下列条件才能具现化出std::atomic<UDT>

  • 必须具备 trivial 拷贝赋值操作符展现出 bitwise 逐位次拷贝);
  • 不得含有任何虚函数不可以派生自虚基类
  • 必须由编译器代其隐式生成拷贝赋值操作符
  • 另外,若具有基类或非静态数据成员,则必须同样具备 trivial 拷贝赋值运算符

所以,根据以上要求,可以看出,赋值操作不涉及任何用户编写的代码,因此编译器可以借助**memcpy()**或采取与之等效的行为完成该操作。

如果采用锁保护数据,而代码又涉及使用者提供的函数,则不得将受保护数据的指针或引用传入该函数,会让它脱离锁的作用域。
image.png

5.2.7 原子操作的非成员函数

C++提供非成员函数,按原子化形式访问**std::shared_ptr<>**的实例。但是这些操作的第一个参数都是std::shared_ptr<>*类型:

  1. std::shared_ptr<int> p{};
  2. void process_global_data() {
  3. std::shared_ptr<int> local = std::atomic_load(&p);
  4. }
  5. void update_global_data() {
  6. std::shared_ptr<int> local(new int(3));
  7. std::atomic_store(&p, local);
  8. }

5.3 同步操作和强制次序

5.3.1 同步关系

同步关系只存在于原子类型的操作之间
同步关系的基本思想是:对变量x执行原子写操作 W 和原子读操作 R,且两者都有适当的标记。只要满足下面其中一点,即彼此同步:

  • R 读取了 W 直接存入的值;
  • W 所属线程随后还执行了另一原子写操作, R 读取了后面存入的值;
  • 任意线程执行一连串“读-改-写”操作,而其中第一个操作读取的值由 W 写出。

    5.3.2 先行关系

    先行关系严格先行关系在程序中确立操作次序:清楚界定哪些操作产生的结果可以被哪些操作看到。

线程间先行关系先行关系中,各种操作时都被标记为memory_order_consume,而严格先行关系则无此标记。

5.3.3 原子操作的内存次序

原子类型服从六种内存次序:memory_order_relaxedmemory_order_consumememory_order_acquirememory_order_releasememory_order_acq_relmemory_order_seq_cst。其中最后一个是最严格的内存次序
代码中的内存次序因采用不同内存模型而异。

1、先后一致次序
先后一致次序为默认内存次序。如果采用先后一致次序,则针对多个线程的并发操作,可以写出他们全部可能的顺序组合,将操作不一致的剔除,从而验证代码是否符合预期。
虽然这种内存次序易于理解,但是会导致严重的性能损失,因为必须在多处理器之间维持全局操作次序。

若某项操作标记为memory_order_seq_cst,则编译器和 CPU 严格遵循源码逻辑流程的先后顺序。 在相同的线程上,以该项操作为界,其后方的任何操作不得重新编排到它前面,而前方的任何操作不得重新编排到它后面,其中“任何”是指带有任何内存标记的任何变量之上的任何操作

  1. //保持先后一致次序会形成一个全局总操作序列
  2. #include <atomic>
  3. #include <cassert>
  4. #include <iostream>
  5. #include <thread>
  6. using namespace std;
  7. std::atomic_bool x, y;
  8. std::atomic_int z;
  9. void write_x() {
  10. x.store(true, std::memory_order_seq_cst);
  11. }
  12. void write_y() {
  13. y.store(true, std::memory_order_seq_cst);
  14. }
  15. void read_x_then_y() {
  16. while (!x.load(std::memory_order_seq_cst))
  17. ;
  18. if (y.load(std::memory_order_seq_cst))
  19. ++z;
  20. }
  21. void read_y_then_x() {
  22. while (!y.load(std::memory_order_seq_cst))
  23. ;
  24. if (x.load(std::memory_order_seq_cst))
  25. ++z;
  26. }
  27. auto main() -> int {
  28. x = false;
  29. y = false;
  30. z = 0;
  31. std::thread a(write_x);
  32. std::thread b(write_y);
  33. std::thread c(read_x_then_y);
  34. std::thread d(read_y_then_x);
  35. a.join();
  36. b.join();
  37. c.join();
  38. d.join();
  39. assert(z.load() != 0);
  40. }

断言无论如何不会触发。image.png
先后一致次序要求在所有线程间进行全局同步,因此也是代价最高的内存次序。

2、非先后一致次序
即使在多个线程上运行的代码相同,由于某些线程上的操作没有显式的次序约束,因此他它们有可能无法就多个事件的发生次序达成一致,而在不同的 CPU 缓存或内部缓冲中,同一份内存数据也可能具有不同的值。
即:线程之间不必就事件发生次序达成一致

如果没有指定程序服从哪种内存次序,则采用默认内存次序。要求:全部线程在每个独立变量上都达成一致的修改序列不同变量上的操作构成其特有的序列

3、宽松次序
memory_order_relaxed,若采用宽松次序,那么原子类型上的操作不存在同步关系。在单一线程内同一个变量上的操作仍然服从先行关系,但几乎不要求线程之间存在任何次序关系

  1. #include <assert.h>
  2. #include <atomic>
  3. #include <thread>
  4. std::atomic<bool> x, y;
  5. std::atomic<int> z;
  6. void write_x_then_y() {
  7. x.store(true, std::memory_order_relaxed);
  8. y.store(true, std::memory_order_relaxed);
  9. }
  10. void read_y_then_x() {
  11. while (!y.load(std::memory_order_relaxed))
  12. ;
  13. if (x.load(std::memory_order_relaxed)) ++z;
  14. }
  15. int main() {
  16. x = false;
  17. y = false;
  18. z = 0;
  19. std::thread a(write_x_then_y);
  20. std::thread b(read_y_then_x);
  21. a.join();
  22. b.join();
  23. assert(z.load() != 0);
  24. }

image.png
变量**x**的存储操作和载入操作分属不同线程,因为采用了宽松次序,所以后者不一定能见到前者执行产生的效果,即存储的新值 true 还停留在 CPU 缓存中而读取的 false 值是来 自内存的旧值

  1. #include <atomic>
  2. #include <iostream>
  3. #include <thread>
  4. std::atomic_int x(0), y(0), z(0); // 1
  5. std::atomic_bool go(false); // 2
  6. unsigned const loop_count = 10;
  7. struct read_values {
  8. int x, y, z;
  9. };
  10. read_values values1[loop_count];
  11. read_values values2[loop_count];
  12. read_values values3[loop_count];
  13. read_values values4[loop_count];
  14. read_values values5[loop_count];
  15. void increment(std::atomic_int* var_to_inc, read_values* values) {
  16. while (!go) // 3
  17. std::this_thread::yield();
  18. for (unsigned i = 0; i < loop_count; ++i) {
  19. values[i].x = x.load(std::memory_order_relaxed);
  20. values[i].y = y.load(std::memory_order_relaxed);
  21. values[i].z = z.load(std::memory_order_relaxed);
  22. var_to_inc->store(i + 1, std::memory_order_relaxed); // 4
  23. std::this_thread::yield();
  24. }
  25. }
  26. void read_vals(read_values* values) {
  27. while (!go) // 5
  28. std::this_thread::yield();
  29. for (unsigned i = 0; i < loop_count; ++i) {
  30. values[i].x = x.load(std::memory_order_relaxed);
  31. values[i].y = y.load(std::memory_order_relaxed);
  32. values[i].z = z.load(std::memory_order_relaxed);
  33. std::this_thread::yield();
  34. }
  35. }
  36. void print(read_values* v) {
  37. for (unsigned i = 0; i < loop_count; ++i) {
  38. if (i)
  39. std::cout << ",";
  40. std::cout << "{" << v[i].x << "," << v[i].y << "," << v[i].z << "}";
  41. }
  42. std::cout << std::endl;
  43. }
  44. auto main() -> int {
  45. std::thread t1(increment, &x, values1);
  46. std::thread t2(increment, &y, values2);
  47. std::thread t3(increment, &z, values3);
  48. std::thread t4(read_vals, values4);
  49. std::thread t5(read_vals, values5);
  50. go = true; // 6
  51. t5.join();
  52. t4.join();
  53. t3.join();
  54. t2.join();
  55. t1.join();
  56. print(values1); //7
  57. print(values2);
  58. print(values3);
  59. print(values4);
  60. print(values5);
  61. }

4、理解宽松次序
(看书…)

5、获取-释放次序
获取-释放次序比宽松次序严格一些,会产生一定程度的同步效果,而不会服从先后一致次序的全局总操作序列。在该内存模型中,原子化载入即为获取操作**memory_order_acquire**原子化存储即为释放操作**memory_order_release**,而原子化“读-改-写”操作即为获取或释放操作,或二者都是memory_order_acq_rel
这种内存次序在成对的读写线程之间起到同步作用,释放与获取操作构成同步关系前者写出的值由后者读取

  1. #include <atomic>
  2. #include <cassert>
  3. #include <iostream>
  4. #include <thread>
  5. using namespace std;
  6. std::atomic_bool x, y;
  7. std::atomic_int z;
  8. void write_x() {
  9. x.store(true, std::memory_order_release);
  10. }
  11. void write_y() {
  12. y.store(true, std::memory_order_release);
  13. }
  14. void read_x_then_y() {
  15. while (!x.load(std::memory_order_acquire))
  16. ;
  17. if (y.load(std::memory_order_acquire))
  18. ++z;
  19. }
  20. void read_y_then_x() {
  21. while (!y.load(std::memory_order_acquire))
  22. ;
  23. if (x.load(std::memory_order_acquire))
  24. ++z;
  25. }
  26. auto main() -> int {
  27. x = false;
  28. y = false;
  29. z = 0;
  30. std::thread a(write_x);
  31. std::thread b(write_y);
  32. std::thread c(read_x_then_y);
  33. std::thread d(read_y_then_x);
  34. a.join();
  35. b.join();
  36. c.join();
  37. d.join();
  38. assert(z.load() != 0);
  39. }

image.png

  1. #include <atomic>
  2. #include <cassert>
  3. #include <thread>
  4. using namespace std;
  5. std::atomic_bool x, y;
  6. std::atomic_int z;
  7. void write_x_then_y() {
  8. x.store(true, std::memory_order_relaxed); // 1
  9. y.store(true, std::memory_order_release); // 2
  10. }
  11. void read_y_then_x() {
  12. while (!y.load(std::memory_order_acquire)) // 3
  13. ;
  14. if (x.load(std::memory_order_relaxed)) // 4
  15. ++z;
  16. }
  17. auto main() -> int {
  18. x = false;
  19. y = false;
  20. z = 0;
  21. std::thread a(write_x_then_y);
  22. std::thread b(read_y_then_x);
  23. a.join();
  24. b.join();
  25. assert(z.load() != 0); // 5
  26. }

image.png

获取-释放次序可用于多线程之间数据的同步,即使“过渡线程”的操作不涉及目标数据,也照样可行。
再看原书P163附近。

  1. std::atomic<int> data[5];
  2. std::atomic<bool> sync1(false),sync2(false);
  3. void thread_1()
  4. {
  5. data[0].store(42,std::memory_order_relaxed);
  6. data[1].store(97,std::memory_order_relaxed);
  7. data[2].store(17,std::memory_order_relaxed);
  8. data[3].store(-141,std::memory_order_relaxed);
  9. data[4].store(2003,std::memory_order_relaxed);
  10. sync1.store(true,std::memory_order_release); // 1.设置sync1
  11. }
  12. void thread_2()
  13. {
  14. while(!sync1.load(std::memory_order_acquire)); // 2.直到sync1设置后,循环结束
  15. sync2.store(true,std::memory_order_release); // 3.设置sync2
  16. }
  17. void thread_3()
  18. {
  19. while(!sync2.load(std::memory_order_acquire)); // 4.直到sync1设置后,循环结束
  20. assert(data[0].load(std::memory_order_relaxed)==42);
  21. assert(data[1].load(std::memory_order_relaxed)==97);
  22. assert(data[2].load(std::memory_order_relaxed)==17);
  23. assert(data[3].load(std::memory_order_relaxed)==-141);
  24. assert(data[4].load(std::memory_order_relaxed)==2003);
  25. }

image.png

5.3.4 释放序列和同步关系

对于同一个原子变量,可以在线程 A 上对其执行存储操作,在线程 B 上对其执行载入操作,从而构成同步关系。即使存储和读取之间还存在着多个“读-改-写”操作,同步关系依然成立,前提是所有操作都采取了合适的内存序。

如果存储操作的标记是memory_order_releasememory_acq_relmemory_order_seq_cst标记,而载入操作以memory_order_consumememory_order_acquirememory_order_seq_cst标记,这些操作前后相扣,每次载入操作的值都源自前面的存储操作,那么该操作链由一个释放序列组成

  1. #include <atomic>
  2. #include <iostream>
  3. #include <thread>
  4. #include <vector>
  5. std::vector<int> queue_data;
  6. std::atomic<int> count;
  7. void populate_queue() {
  8. unsigned const number_of_items = 20;
  9. queue_data.clear();
  10. for (unsigned i = 0; i < number_of_items; ++i)
  11. queue_data.push_back(i);
  12. count.store(number_of_items, std::memory_order_release);//1.最初的存储操作
  13. }
  14. void consume_queue_items() {
  15. while (true) {
  16. int item_index;
  17. //2.一项“读-改-写”操作
  18. if ((item_index = count.fetch_sub(1, std::memory_order_acquire)) <= 0) {
  19. ; //3.等待队列容器装入新数据
  20. continue;
  21. }
  22. //4.从内部容器读取数据项是安全的
  23. queue_data[item_index - 1];
  24. }
  25. }
  26. auto main() -> int {
  27. std::thread a(populate_queue);
  28. std::thread b(consume_queue_items);
  29. std::thread c(consume_queue_items);
  30. a.join();
  31. b.join();
  32. c.join();
  33. }

虚线表示释放序列,实线表示先行关系:
image.png

5.3.5栅栏

  1. #include <iostream>
  2. #include <atomic>
  3. #include <cassert>
  4. #include<thread>
  5. using namespace std;
  6. std::atomic_bool x, y;
  7. std::atomic_int z;
  8. void write_x_then_y() {
  9. x.store(true, std::memory_order_relaxed);
  10. std::atomic_thread_fence(std::memory_order_release);
  11. y.store(true, std::memory_order_relaxed);
  12. }
  13. void read_y_then_x() {
  14. while (!y.load(std::memory_order_relaxed))
  15. ;
  16. std::atomic_thread_fence(std::memory_order_acquire);
  17. if(x.load(std::memory_order_relaxed))
  18. ++z;
  19. }
  20. auto main()->int {
  21. x = false;
  22. y = false;
  23. z = 0;
  24. std::thread a(write_x_then_y);
  25. std::thread b(read_y_then_x);
  26. a.join();
  27. b.join();
  28. assert(z.load() != 0);
  29. }

5.3.6 对非原子变量的操作排序

  1. #include <atomic>
  2. #include <thread>
  3. #include <assert.h>
  4. #include<iostream>
  5. using namespace std;
  6. bool x=false; // x现在是一个非原子变量
  7. std::atomic<bool> y;
  8. std::atomic<int> z;
  9. void write_x_then_y() {
  10. x=true; // 1 在栅栏前存储x
  11. std::atomic_thread_fence(std::memory_order_release);
  12. y.store(true,std::memory_order_relaxed); // 2 在栅栏后存储y
  13. }
  14. void read_y_then_x() {
  15. while(!y.load(std::memory_order_relaxed)); // 3 在#2写入前,持续等待
  16. std::atomic_thread_fence(std::memory_order_acquire);
  17. if(x) // 4 这里读取到的值,是#1中写入
  18. ++z;
  19. }
  20. auto main()->int {
  21. x=false;
  22. y=false;
  23. z=0;
  24. std::thread a(write_x_then_y);
  25. std::thread b(read_y_then_x);
  26. a.join();
  27. b.join();
  28. assert(z.load()!=0); // 5 断言将不会触发
  29. }

使用一个普通的非原子变量bool类型,行为和之前完全一样。

推荐博客链接