4.1 等待事件或等待其他条件

如果线程A 需要等待线程B 完成任务,可以采取几种不同方式:

  • 方式一:在共享数据内部维护一个标志(受互斥保护),线程B 完成任务后,就设置标志成立。

该方式存在双重浪费:

  1. 线程A 必须不断查验标志,浪费原本有用的处理时间,会限制线程B 的可用算力;
  2. 一旦互斥被锁住,其他任何线程都无法再加锁。线程A 每次查验标志,都要锁住互斥量以施加保护,那么此时若线程B 刚好同时完成任务,即想要设置标志成立,则无法对互斥加锁。
  • 方式二:让线程A 调用**std::this_thread::sleep_for()**函数,在各次查验之间短期休眠

    1. bool flag;
    2. std::mutex m;
    3. void wait_for_flag(){
    4. std::unique_lock<std::mutex>lk(m);
    5. while(!flag){
    6. lk.unlock();
    7. std::this_thread::sleep_for(std::chrono::milliseconds(100));
    8. lk.lock();
    9. }
    10. }

    然而休眠时间的长短难以预知,太短则线程仍然会频繁查验,消耗处理时间;太长则令线程过度休眠,如果线程B 完成了任务,线程A 却没有被及时唤醒,就会导致延迟。

  • 方式三:使用C++标准库的工具等待事件发生——条件变量

4.1.1 凭借条件变量等待条件成立

C++标准库提供了两种条件变量的实现:std::condition_variablestd::condition_variable_any都需要配合互斥量才能提供同步操作
std::condition_variable仅限与std::mutex一起使用;然而只要某一类型符合成为互斥的最低标准,足以充当互斥std::condition_variable_any就可以与之配合使用,因此其后缀是_any。由于后者更佳通用,可能产生额外开销,涉及其性能、自身体积或系统资源等,所以前者应该优先采用。

std::condition_variablewait()函数有第二个可选参数,接受一个bool类型,当值为false时就会被阻塞在这里,只有当该线程被唤醒后,且第二参数为true时才会往下运行。

std::condition_variable必须和std::unique_lock搭配而不能和std::lock_guard搭配的原因在于:线程A 在等待期间,必须解锁互斥,而等待结束之后,必须重新加锁,但是**std::lock_guard**不能提供这种灵活性
假设线程A 在休眠时,互斥依然被锁住,那么即使线程B 备妥了数据,也不能锁住互斥,也就无法将其添加到队列中。结果就是线程A 等待的条件永远无法成立,会无休止地等待下去。

wait()调用期间,条件变量可以多次查验给定条件次数不受限制在查验时互斥量总会被锁。另外,当且仅当传入的判定函数返回 true 时,**wait()**才会立即返回
如果线程A 重新获得互斥,并且查验条件,而这一行为却不是直接响应线程B 的通知,则称之为虚假唤醒。这种虚假唤醒出现的数量和频率不也得,因此,若判定函数有副作用,则不建议用作查验条件。
e.g. 每次被调用时,判定函数就 顺带提高所属线程的优先级,该提升动作即产生的副作用。结果,多次伪唤醒可“意外 地”令线程优先级变得非常高。

std::condition_variable::wait()本质上是忙等的优化,下列代码是wait()一种合法实现,仅使用了一个简单地循环,但效率不尽如人意:

  1. template<typename Predicate>
  2. void minimal_wait(std::unique_lock<std::mutex>& lk, Predicate pred){
  3. while(!pred){
  4. lk.unlock();
  5. lk.lock();
  6. }
  7. }

在线程之间传递数据常见的方法就是运用队列。若队列实现到位,同步操作就可以呗限制在内部,从而大幅减少可能出现的同步问题和竞态条件。

4.1.2 利用条件变量构建线程安全的队列

由于接口之间存在固有的竞态条件,所以需要吧front()pop()合并成一个函数。
而当队列用于线程间数据传递时,负责接收的线程尝尝需要等待数据压入。所以对外提供pop()的两个变体:try_pop()wait_and_pop():它们都试图弹出队首元素,前者总是立即返回,即便队列内没有元素;后者会一直等到有数据压入可供获取。

  1. #include <iostream>
  2. #include <thread>
  3. #include <condition_variable>
  4. #include <mutex>
  5. #include <memory>
  6. #include <queue>
  7. template<typename T>
  8. class threadsafe_queue{
  9. private:
  10. mutable std::mutex mut;
  11. std::queue<T> data_queue;
  12. std::condition_variable data_cond;
  13. public:
  14. threadsafe_queue(){}
  15. threadsafe_queue(threadsafe_queue const& other){
  16. std::lock_guard<std::mutex>lk(other.mut);
  17. data_queue = other.data_queue;
  18. }
  19. void push(T new_value){
  20. std::lock_guard<std::mutex>lk(mut);
  21. data_queue.push(new_value);
  22. data_cond.notify_one();
  23. }
  24. void wait_and_pop(T& value){
  25. std::unique_lock<std::mutex> lk(mut);
  26. data_cond.wait(lk, [this]{return !data_queue.empty();});
  27. value = data_queue.front();
  28. data_queue.pop();
  29. }
  30. std::shared_ptr<T> wait_and_pop(){
  31. std::unique_lock<std::mutex> lk(mut);
  32. data_cond.wait(lk, [this]{return !data_queue.empty();});
  33. std::shared_ptr<T> res(std::make_shared<T>(data_queue.front()));
  34. data_queue.pop();
  35. return res;
  36. }
  37. bool try_pop(T& value){
  38. std::lock_guard<std::mutex>lk(mut);
  39. if(data_queue.empty())
  40. return false;
  41. value = data_queue.front();
  42. data_queue.pop();
  43. return true;
  44. }
  45. std::shared_ptr<T> try_pop(){
  46. std::lock_guard<std::mutex>lk(mut);
  47. if(data_queue.empty())
  48. return std::shared_ptr<T>();
  49. std::shared_ptr<T> res(std::make_shared<T>(data_queue.front()));
  50. data_queue.pop();
  51. return res;
  52. }
  53. bool empty() const{
  54. std::lock_guard<std::mutex> lk(mut);
  55. return data_queue.empty();
  56. }
  57. };
  1. #include"threadsafe_queue.h"
  2. #include<vector>
  3. using namespace std;
  4. threadsafe_queue<int>tq{};
  5. vector<int>vec{};
  6. void pushIntoQueue(){
  7. for(int i = 0; i < 50; ++i){
  8. tq.push(i);
  9. }
  10. }
  11. void popFromQueue(){
  12. int i = 50;
  13. while(i--){
  14. vec.push_back(*tq.wait_and_pop());
  15. }
  16. }
  17. auto main()->int{
  18. std::thread t1(popFromQueue);
  19. std::thread t2(pushIntoQueue);
  20. t1.join();
  21. t2.join();
  22. for(auto i : vec){
  23. cout<<i<<" ";
  24. }
  25. return 0;
  26. }

然而,假定某个线程按计划仅仅等待一次,只要条件成立一次,它就不再理会条件变量。条件变量未必是这种同步模式的最佳选择。若我们所等待的条件需要判定某份数据是否可用,那么future更适合此场景。

4.2 使用 future 等待一次性事件发生

C++标准库使用future模拟一次性事件:若线程需等待某个特定的一次性事件发生,则会以恰当的方式得到一个**future**。其代表目标事件,接着该线程就能一边执行其他任务,一边在**future**上等待,同时它以短暂的间隔反复查验目标事件是否已经发生

C++有两种future,分别由两个类模板实现:

  • 独占**future**:即std::future<>同一事件仅允许关联唯一一个**std::future**实例
  • 共享**future**:即std::shared_future<>同一事件允许关联多个**std::shared_future**实例。只要目标事件发生,与后者关联的所有实例就会同时就绪,并且全都可以访问与该目标事件关联的任何数据。

关联数据就是两种future以模板形式实现的原因:模板参数就是关联数据的类型。如果没有关联数据,就应该使用特化的模板std::future<**void**>std::shared_future<**void**>

虽然**future**可以用于线程间通信,但是**future**本身不提供同步访问。若多个线程需访问同一个future对象,必须使用互斥或其他同步方式保护。
一个**std::shared_future<>**对象可能派生出多个副本,这些副本都指向同一个异步结果,由多个线程分别独占,它们可以访问属于自己的那个副本而无须互相同步。

最基本的一次性事件就是后台运行的计算任务完成,得出结果。**std::thread**并不能提供简洁的方式从计算任务返回求得的值。现在可以利用**future**来获得

4.2.1 从后台任务返回值

只要不急需线程运算的值就可以使用**std::async()**按异步方式启动任务。从std::async()函数处获得std::future对象,运行的函数一旦完成,其返回值就由该对象最后持有。若要用到这个值,只需要**future**对象上调用**get()**当前线程就会阻塞,以便future准备妥当并返回该值。

  1. #include<future>
  2. #include<iostream>
  3. using namespace std;
  4. int find_answer_to_ltuae(){
  5. return 100;
  6. }
  7. void do_other_stuff(){
  8. std::cout<<"do_other_stuff ing......"<<std::endl;
  9. }
  10. auto main()->int{
  11. std::future<int> the_answer = std::async(find_answer_to_ltuae);
  12. do_other_stuff();
  13. std::cout<<"The answer is: "<<the_answer.get()<<std::endl;
  14. }

std::thread的构造函数相同,std::async可以接受附加参数,进而传递给任务函数作为其参数。若要异步运行某个类的成员函数,则**std::async**的第一个参数应该是一个函数指针指向该类的目标成员函数,第二个参数需要给出相应的对象(可以是指向对象的指针、对象本身或由std::ref包装的对象)。
HINT如果**std::async()**的参数是右值,则通过移动原始参数构建副本,与复制std::thread实例相同。
Example 1:
image.png
Example 2:
image.png

默认情况下,std::async()的具体实现会自行决定——等待**future**时,是启动新线程,还是同步执行任务。大多数情况下,这样就可以了。
不过为了更加可控,可以**std::async()**补充一个参数,作为第一参数,指定采用哪种运行方式。参数类型是std::launch,其值可以是:

  • std::launch::deferred,指定在当前线程上延后调用任务函数等到在**future**上调用了**wait()****get()**,任务函数才会执行
  • std::launch::async指定必须另外开启专属的线程在其上运行任务函数

若延后调用任务函数,则任务函数可能永远不会运行
image.png

使std::future和任务关联并非唯一的方法:

  • 运用类模板**std::packaged_task<>**的实例,也能将任务包装起来;
  • 或者利用**std::promise<>**类模板显式地异步求值

4.2.2 关联 future 实例和任务

**std::packaged_task<>**连结了**future**对象与函数(或可调用对象)std::packaged_task<>对象在执行任务时,会调用关联的函数(或可执行对象),把返回值保存为**future**的内部数据,并令future准备就绪。
如果一项操作能分解为多个子任务,则可以把它们分别包装到多个std::packaged_task<>实例中,再传递给任务调度器或线程池。这就隐藏了细节,使任务抽象化,让调度器可以专注处理std::packaged_task<>实例,无须纠缠于各种不同的任务函数。

std::packaged_task<>是类模板,其模板参数是函数签名。但也不必严格匹配, 若某函数接收int类型参数并返回float值,我们则可以为其构建std::packaged_ task<double(double)>的实例,因为对应的类型可进行隐式转换
类模板std::packaged_task<>具有成员函数get_future(), 返回std::future<>实例。**future**特化类型取决于函数签名所指定的返回值
std::packaged_task<>还具备函数调用操作符其参数取决于函数签名的参数列表

  1. template<>
  2. class packaged_task<std::string(std::vector<char>*,int)> {
  3. public:
  4. template<typename Callable>
  5. explicit packaged_task(Callable&& f);
  6. std::future<std::string> get_future();
  7. void operator()(std::vector<char>*,int);
  8. };

**std::packaged_task<>**对象是可调用对象,可以直接调用,还可以将其包装在std::function对象内,当做线程函数传递给std::thread对象,也可以传递给需要可调用对象的函数。

std::packaged_task<>作为函数对象而被调用,就会通过函数调用操作符接收参数,并将其进一步传递给包装在内地的任务函数,由其异步运行得出结果,并将结果保存到**std::future**对象内部再通过**get_future()**获取此对象
因此,为了在未来的适当时刻执行某项任务,可以将其包装在std::packaged_task<>对象内,取得对应的**future**之后,才把该对象传递给其他线程,由它触发任务执行。等到需要使用结果时,等待**future**准备就绪即可

线程间传递任务
许多 GUI 框架都设立了专门的线程,作为更新界面的实际执行者。若别的线程需要更新界面,就必须向它发送消息。由它执行对应操作。
该模式可以使用std::packaged_task实现:

  1. #include<iostream>
  2. #include<deque>
  3. #include<mutex>
  4. #include<future>
  5. #include<thread>
  6. #include<utility>
  7. using namespace std;
  8. std::mutex m;
  9. std::deque<std::packaged_task<void()>>tasks;
  10. bool gui_shutdown_message_received();
  11. void get_and_process_gui_message();
  12. void gui_thread(){ //1
  13. while(!gui_shutdown_message_received()){ //2
  14. get_and_process_gui_message(); //3
  15. std::packaged_task<void()>task;
  16. {
  17. std::lock_guard<std::mutex>lk(m);
  18. if(tasks.empty()) //4
  19. continue;
  20. task = std::move(tasks.front()); //5
  21. tasks.pop_front();
  22. }
  23. task(); //6
  24. }
  25. }
  26. std::thread gui_bg_thread(gui_thread);
  27. template<typename Func>
  28. std::future<void> post_task_for_gui_thread(Func f){
  29. std::packaged_task<void()> task(f); //7
  30. std::future<void>res = task.get_future(); //8
  31. std::lock_guard<std::mutex> lk(m);
  32. tasks.push_back(std::move(task)); //9
  33. return res; //10
  34. }

在 GUI 线程上①,轮询任务队列和待处理的界面消息(如用户的单击)③;若有消息指示界面关闭,则循环终止②。假如任务队列一无所有,则循环继续④;否则,就从中取出任务⑤,释放任务队列上的锁,随即运行该任务⑥。在任务完成时,与它关联的 future 会进入就绪状态。

向任务队列布置任务也很简单。依据给定的函数创建新任务,将任务包装在内⑦,并随即通过调用成员函数get_future(),取得与该任务关联的future⑧,然后将任务放入任务队列⑨,接着向 post_task_for_gui_thread()的调用者返回future⑩。接下来,有关代码向 GUI 线程投递消息,假如这些代码需判断任务是否完成,以获取结果进而采取后续操作,那么只要等待future就绪即可;否则,任务的结果不会派上用场,关联的future可被丢弃。

有些任务无法以简单地函数调用表达出来。还有一些任务的执行结果可能来自多个部分的代码。这时候需要采用第三种方法:借助**std::promise**显式异步求值

4.2.3 创建 std::promise

假设有个应用需要处理大量网络连接,一开始可能倾向于运用多个独立线程。一对一地处理各个连接,原因是这能简化网络通信的构思,程序编写也相对容易。 如果连接数量较少(因而线程数量也少),此方式行之有效 ; 随着连接数量攀升,它就力不从心了。过多线程导致消耗巨量系统资源,一旦线程数量超出硬件所支持的并发任务数量,还可能引起繁重的上下文切换,影响性能。极端情况下,在网络连接超出负 荷之前,操作系统就可能已经先耗尽别的资源,无法再运行新线程。故此,若应用要处理大量网络连接,通常交由少量线程负责处理(可能只有一个),每个线程同时处理多个连接

std::promise<T>给出一种异步求值的方法(类型为 T),某个**std::future<T>**对象与结果关联。能延后读出需要求取的值。
配对的std::promise<T>std::future<T>可以实现:等待数据的线程在**future**上阻塞,而提供数据的线程利用相配的**promise**设定关联的值使**future**准备就绪

若需从给定的std:promise实例获取关联的std::future对象,调用前者的成员函数**get_future()**即可**promise**的值通过成员函数**set_value()**设置,只要设置好,future即准备就绪,凭借它就能获取该值。
如果std::promise被销毁时仍未设置值,保存的数据则由异常代替。

4.2.5 多个线程一起等待

只要同步操作是一对一地在线程传递数据,std::future都能处理。然而对于某个std::future实例,如果其成员函数由不同线程调用,却不会自动同步。
这是std::future特性:模拟了对异步结果的独占行为**get()**仅能被有效调用一次只有一个线程能获取目标值,原因是第一次调用**get()**会进行移动操作,之后该值不复存在
image.png

std::shared_future可以让多个线程等待同一个目标事件std::future仅能移动构造移动赋值,所以归属权可以在多个实例之间转移,但是在相同时刻只有唯一一个**future**实例指向特定的异步结果
image.png
std::shared_future的实例则可以复制出副本,因此可以持有该类的多个对象全部指向同一异步任务的状态数据

即使改用**std::shared_future**,同一个对象的成员函数依然没有同步。如果从多个线程访问同一个对象,就必须采用锁保护。
首选方式是:向每个线程传递**std::shared_future**对象的副本它们为各线程独自所有并被视作局部变量。这样这些副本就作为各线程的内部数据,由标准库正确同步。 若多个线程共享异步状态 ,只要它们通过自有的std::shared_future对象读取状态数据,则该访问行为是安全的 :
Version 1:
image.png
Version 2:
image.png

futurepromise都具备成员函数valid(),用于判断异步状态是否有效

**std::shared_future**的实例依据**std::future**的实例构造得来,前者所指向的异步状态由后者决定。因为**std::future**对象独占异步状态,其归属权不被其他任何对象所共有。所以若要按默认方式构造**std::shared_future**对象,则须用**std::move**向其默认构造函数传递归属权,这使**std::futrue**变成空状态

  1. std::promise<int> p;
  2. std::future<int> f(p.get_future());
  3. assert(f.valid()); //future 对象 f 有效
  4. std::shared_future<int> sf(std::move(f)); //对象 f 不再有效
  5. assert(!f.valid());
  6. assert(sf.valid()); //对象 sf 开始生效

而且std::future拥有成员函数share(),直接创建新的std::shared_future对象,并向它转移归属权:

  1. std::promise<std::map<int,int>::iterator> p;
  2. auto sf = p.get_future().share();

future、async、promise、packaged_task详解

std::future

image.png
image.png
image.png
image.png
HINT:鼓励在调用前监测valid()false的情况。
image.png
其实这里的意思应该是关联,关联到一个实例的意思。

  1. #include<iostream>
  2. #include<future>
  3. #include<thread>
  4. using namespace std;
  5. int main(void){
  6. std::promise<void> p{};
  7. std::future<void> f = p.get_future();
  8. std::cout<<std::boolalpha;
  9. std::cout<<f.valid()<<std::endl;
  10. p.set_value();
  11. std::cout<<f.valid()<<std::endl;
  12. f.get();
  13. std::cout<<f.valid()<<std::endl;
  14. }
  15. //result
  16. true
  17. true
  18. false

image.png
future代码实例:

  1. #include <future>
  2. #include <iostream>
  3. #include <thread>
  4. using namespace std;
  5. int main(void) {
  6. std::packaged_task<int()> task([]() { return 7; }); //包装函数
  7. std::future<int> f1 = task.get_future(); //获取future
  8. std::thread(std::move(task)).detach(); //在线程上运行
  9. //来自 async() 的 future
  10. std::future<int> f2 = std::async(std::launch::async, []() { return 8; });
  11. //来自 promise() 的 future
  12. std::promise<int> p{};
  13. std::future<int> f3 = p.get_future();
  14. std::thread([&]() { p.set_value_at_thread_exit(9); }).detach();
  15. std::cout<<"Waiting..."<<std::flush;
  16. f1.wait();
  17. f2.wait();
  18. f3.wait();
  19. std::cout << "Done!\nResults are: "
  20. << f1.get() << ' ' << f2.get() << ' ' << f3.get() << '\n';
  21. }

std::shared_future

image.png
image.png
image.png
image.png
HINT:鼓励在get()之前采用valid()检查。
image.png
std::shared_future不同于std::future,它不会在调用get()valid()结果变为false

  1. #include <future>
  2. #include <iostream>
  3. int main() {
  4. std::promise<void> p;
  5. std::shared_future<void> f = p.get_future();
  6. std::cout << std::boolalpha;
  7. std::cout << f.valid() << '\n';
  8. p.set_value();
  9. std::cout << f.valid() << '\n';
  10. f.get();
  11. std::cout << f.valid() << '\n';
  12. }
  13. //reuslt
  14. true
  15. true
  16. true

image.png
在同一个std::shared_future实例上从多个线程调用wait()不安全,应该令每个等待于同一个共享状态上的线程拥有一个**std::shared_future**副本

std::shared_future示例:

  1. #include <chrono>
  2. #include <future>
  3. #include <iostream>
  4. #include <thread>
  5. int main(void) {
  6. std::promise<void> ready_promise, t1_ready_promise, t2_ready_promise;
  7. std::shared_future<void> ready_future(ready_promise.get_future());
  8. std::chrono::time_point<std::chrono::high_resolution_clock> start;
  9. auto fun1 = [&, ready_future]() -> std::chrono::duration<double, std::milli> {
  10. t1_ready_promise.set_value();
  11. ready_future.wait(); // 等待来自 main() 的信号
  12. return std::chrono::high_resolution_clock::now() - start;
  13. };
  14. auto fun2 = [&, ready_future]() -> std::chrono::duration<double, std::milli> {
  15. t2_ready_promise.set_value();
  16. ready_future.wait(); // 等待来自 main() 的信号
  17. return std::chrono::high_resolution_clock::now() - start;
  18. };
  19. auto result1 = std::async(std::launch::async, fun1);
  20. auto result2 = std::async(std::launch::async, fun2);
  21. //等待线程变为就绪
  22. t1_ready_promise.get_future().wait();
  23. t2_ready_promise.get_future().wait();
  24. //线程已就绪,开始时钟
  25. start = std::chrono::high_resolution_clock::now();
  26. ready_promise.set_value();
  27. std::cout << "Thread 1 received the signal "
  28. << result1.get().count() << " ms after start\n"
  29. << "Thread 2 received the signal "
  30. << result2.get().count() << " ms after start\n";
  31. }

std::async

image.png
函数模板async异步地运行函数f()(潜在地在可能是线程池一部分的分离线程中),并返回最终将保有该函数调用结果的std::future

  1. 表现如同以policystd::launch::async | std::launch::deferred调用(2)。即f()可能执行于某一线程也可能在查询产生的**std::future**的值时同步运行
  2. 按照特定的执行策略policy,以参数args调用函数f()
    • 若设置 async 标志(即 (policy & std::launch::async) != 0 ),则**async**在新的执行线程(初始化所有线程局域对象后)执行可调用对象**f()** ,如同产出std::thread(std::forward<F>(f),std::forward<Args>(args)...),除了若f()返回值或抛出异常,则于可通过async返回给调用方的std::future访问的共享状态存储结果。
    • 若设置deferred标志(即 (policy & std::launch::deferred) != 0 ),则async以同std::thread构造函数的方式转换f()args...但不产出新的执行线程。而是进行惰性求值:在async所返回的std::future首次调用非定时等待函数,将导致在当前线程(不必是最初调用std::async的线程)中,以args...(作为右值传递)的副本调用f()(亦作为右值)的副本。将结果或异常置于关联到该future的共享状态,然后才令它就绪。对同一**std::future**的所有后续访问都会立即返回结果
    • policy中设置了std::launch::asyncstd::launch::deferred两个标志,则进行异步执行还是惰性求值取决于实现
    • policy中未设置std::launch::asyncstd::launch::deferred或任何实现定义策略标志,则行为未定义

image.png

  1. #include <algorithm>
  2. #include <future>
  3. #include <iostream>
  4. #include <mutex>
  5. #include <numeric>
  6. #include <string>
  7. #include <vector>
  8. using namespace std;
  9. std::mutex m;
  10. struct X {
  11. void foo(int i, const std::string& str) {
  12. std::lock_guard<std::mutex> lk(m);
  13. std::cout << str << ' ' << i << '\n';
  14. }
  15. void bar(const std::string& str) {
  16. std::lock_guard<std::mutex> lk(m);
  17. std::cout << str << '\n';
  18. }
  19. int operator()(int i) {
  20. std::lock_guard<std::mutex> lk(m);
  21. std::cout << i << '\n';
  22. return i + 10;
  23. }
  24. };
  25. template <typename RandomIt>
  26. int parallel_sum(RandomIt beg, RandomIt end) {
  27. auto len = end - beg;
  28. if (len < 1000)
  29. return std::accumulate(beg, end, 0);
  30. RandomIt mid = beg + len / 2;
  31. auto handle = std::async(std::launch::async,
  32. parallel_sum<RandomIt>, mid, end);
  33. int sum = parallel_sum(beg, mid);
  34. return sum + handle.get();
  35. }
  36. int main(void) {
  37. std::vector<int> v(1000, 1);
  38. std::cout << "The sum is " << parallel_sum(v.begin(), v.end()) << '\n';
  39. X x;
  40. // 以默认策略调用 x.foo(42, "Hello") :
  41. // 可能同时打印 "Hello 42" 或延迟执行
  42. auto a1 = std::async(&X::foo, &x, 42, "Hello");
  43. // 以 deferred 策略调用 x.bar("world!")
  44. // 调用 a2.get() 或 a2.wait() 时打印 "world!"
  45. auto a2 = std::async(std::launch::deferred, &X::bar, x, "world!");
  46. // 以 async 策略调用 X()(43) :
  47. // 同时打印 "43"
  48. auto a3 = std::async(std::launch::async, X(), 43);
  49. a2.wait();
  50. std::cout << a3.get() << '\n';
  51. }

std::packaged_task

image.png
image.png
HINT**std::packaged_task()**只能被移动
image.png
image.png

  1. #include <iostream>
  2. #include <cmath>
  3. #include <thread>
  4. #include <future>
  5. int main()
  6. {
  7. std::packaged_task<int(int,int)> task([](int a, int b) {
  8. return std::pow(a, b);
  9. });
  10. std::future<int> result = task.get_future();
  11. task(2, 9);
  12. std::cout << "2^9 = " << result.get() << '\n';
  13. task.reset();
  14. result = task.get_future();
  15. std::thread task_td(std::move(task), 2, 10);
  16. task_td.join();
  17. std::cout << "2^10 = " << result.get() << '\n';
  18. }

std::packaged_task示例:

  1. #include <cmath>
  2. #include <functional>
  3. #include <future>
  4. #include <iostream>
  5. #include <thread>
  6. using namespace std;
  7. int f(int x, int y) { return std::pow(x, y); }
  8. void task_lambda() {
  9. std::packaged_task<int(int, int)> task([](int a, int b) {
  10. return std::pow(a, b);
  11. });
  12. std::future<int> result = task.get_future();
  13. task(2, 9);
  14. std::cout << "task_lambda:\t" << result.get() << '\n';
  15. }
  16. void task_bind() {
  17. std::packaged_task<int()> task(std::bind(f, 2, 11));
  18. std::future<int> result = task.get_future();
  19. task();
  20. std::cout << "task_bind:\t" << result.get() << '\n';
  21. }
  22. void task_thread() {
  23. std::packaged_task<int(int, int)> task(f);
  24. std::future<int> result = task.get_future();
  25. std::thread task_td(std::move(task), 2, 10);
  26. task_td.join();
  27. std::cout << "task_thread:\t" << result.get() << '\n';
  28. }
  29. int main() {
  30. task_lambda();
  31. task_bind();
  32. task_thread();
  33. }

std::promise

std::promise的作用就是提供一个不同线程之间的数据同步机制,它可以存储一个某种类型的值,并将其传递给对应的future即使这个**future**不在同一个线程中也可以安全的访问到这个值
image.png
image.png
HINT**std::promise**不可复制
image.png
image.png
image.png

  1. #include <algorithm>
  2. #include <cctype>
  3. #include <future>
  4. #include <iostream>
  5. #include <iterator>
  6. #include <sstream>
  7. #include <thread>
  8. #include <vector>
  9. int main() {
  10. std::istringstream iss_numbers{"3 4 1 42 23 -23 93 2 -289 93"};
  11. std::istringstream iss_letters{" a 23 b,e a2 k k?a;si,ksa c"};
  12. std::vector<int> numbers;
  13. std::vector<char> letters;
  14. std::promise<void> numbers_promise, letters_promise;
  15. auto numbers_ready = numbers_promise.get_future();
  16. auto letter_ready = letters_promise.get_future();
  17. std::thread value_reader([&] {
  18. // I/O 操作。
  19. std::copy(std::istream_iterator<int>{iss_numbers},
  20. std::istream_iterator<int>{},
  21. std::back_inserter(numbers));
  22. // 为数字提醒。
  23. numbers_promise.set_value();
  24. std::copy_if(std::istream_iterator<char>{iss_letters},
  25. std::istream_iterator<char>{},
  26. std::back_inserter(letters),
  27. ::isalpha);
  28. // 为字母提醒。
  29. letters_promise.set_value();
  30. });
  31. numbers_ready.wait();
  32. std::sort(numbers.begin(), numbers.end());
  33. if (letter_ready.wait_for(std::chrono::seconds(1)) ==
  34. std::future_status::timeout) {
  35. // 在获得字母的同时输出数
  36. for (int num : numbers)
  37. std::cout << num << ' ';
  38. numbers.clear(); // Numbers were already printed.
  39. }
  40. letter_ready.wait();
  41. std::sort(letters.begin(), letters.end());
  42. // 若已打印数,则不做任何事。
  43. for (int num : numbers)
  44. std::cout << num << ' ';
  45. std::cout << '\n';
  46. for (char let : letters)
  47. std::cout << let << ' ';
  48. std::cout << '\n';
  49. value_reader.join();
  50. return 0;
  51. }

image.png

  1. #include <iostream>
  2. #include <future>
  3. #include <thread>
  4. int main()
  5. {
  6. using namespace std::chrono_literals;
  7. std::promise<int> p;
  8. std::future<int> f = p.get_future();
  9. std::thread([&p] {
  10. std::this_thread::sleep_for(1s);
  11. p.set_value_at_thread_exit(9);
  12. }).detach();
  13. std::cout << "Waiting..." << std::flush;
  14. f.wait();
  15. std::cout << "Done!\nResult is: " << f.get() << '\n';
  16. }

std::promise实例:

  1. #include <vector>
  2. #include <thread>
  3. #include <future>
  4. #include <numeric>
  5. #include <iostream>
  6. #include <chrono>
  7. void accumulate(std::vector<int>::iterator first,
  8. std::vector<int>::iterator last,
  9. std::promise<int> accumulate_promise)
  10. {
  11. int sum = std::accumulate(first, last, 0);
  12. accumulate_promise.set_value(sum); // 提醒 future
  13. }
  14. void do_work(std::promise<void> barrier)
  15. {
  16. std::this_thread::sleep_for(std::chrono::seconds(1));
  17. barrier.set_value();
  18. }
  19. int main()
  20. {
  21. // 演示用 promise<int> 在线程间传递结果。
  22. std::vector<int> numbers = { 1, 2, 3, 4, 5, 6 };
  23. std::promise<int> accumulate_promise;
  24. std::future<int> accumulate_future = accumulate_promise.get_future();
  25. std::thread work_thread(accumulate, numbers.begin(), numbers.end(),
  26. std::move(accumulate_promise));
  27. // future::get() 将等待直至该 future 拥有合法结果并取得它
  28. // 无需在 get() 前调用 wait()
  29. //accumulate_future.wait(); // 等待结果
  30. std::cout << "result=" << accumulate_future.get() << '\n';
  31. work_thread.join(); // wait for thread completion
  32. }
  33. //结果
  34. result = 21

4.3 限时等待

有两种超时机制可选:

  • 延迟超时:线程根据指定的时长继续等待:以_for为后缀;
  • 绝对超时:在某特定时间点来临之前,线程一直等待:以_until为后缀

4.3.1 时钟类

C++每种时钟都是一个类,提供 4 项关键信息:

  • 当前时刻;
  • 时间值的类型(从该时钟取得的时间以它为表示形式);
  • 该时钟的计时单元的长度
  • 计时速率是否恒定,即能否将该时钟视为恒稳时钟。

若要获取某时钟类的当前时刻,调用其静态成员函数now()即可。 每个时钟类都具有名为**time_ point**的成员类型(member type),它是该时钟类自有的时间点类。据此,some_clock:: now()的返回值的类型就是some_clock::time_point

时钟类的计时单元属于period的成员类型,表示为秒的分数形式: 若时钟每秒计数 25 次,它的计时单元即为std::ratio<1,25>;若时钟每隔 2.5 秒计数 1 次,则其计时单元为std::ratio<5,2>

时钟类具有静态数据成员is_steady,该值在恒稳时钟内为true,否则为 false。 通常,std::chrono::system_clock类不是恒稳时钟,因为它可调整。即便这种调整自动发生,作用是消除本地系统时钟的偏差,依然可能导致:调用两次now(),后来返回的时间值甚至早于前一个。

4.3.2 时长类

看书,不记笔记了….

4.3.4 接受超时时限的函数

image.png
image.png

4.4 运用同步操作简化代码

从底层运作机制抽身,专注于需要进行同步的操作。一种途径是:在兵法中使用函数式编程的风格,线程间不会直接共享数据,而由各任务预先备妥自己所需的数据,并借助**future**将结果发送到其他有需要的线程

4.4.1 利用 future 进行函数式编程

函数式编程函数调用的结果完全取决于参数,而不依赖于任何外部状态

只要共享数据没有改动,就不会引发竞态条件,因而无需使用互斥。使用future可以更好的实现:future对象可在线程间传递,所以一个计算任务可以依赖于另一个任务的结果,而不必显式访问共享数据

并发执行的快速排序:

  1. // 运用 future 实现并行快速排序
  2. #include <algorithm>
  3. #include <iostream>
  4. #include <list>
  5. #include <future>
  6. using namespace std;
  7. template <typename T>
  8. std::list<T> parallel_quick_sort(std::list<T> input) {
  9. if (input.empty())
  10. return input;
  11. std::list<T> result{};
  12. //剪贴函数,将另外一个 list 中的元素剪贴到本 list 当中
  13. result.splice(result.begin(), input, input.begin());
  14. T const& pivot = *result.begin();
  15. // partition:
  16. // Move elements for which a predicate is true to the beginning of a sequence.
  17. // 这里把链表按照基准值区分开
  18. auto divide_point = std::partition(input.begin(), input.end(),
  19. [&](T const& t) { return t < pivot; });
  20. std::list<T> lower_part{};
  21. lower_part.splice(lower_part.end(), input, input.begin(), divide_point);
  22. std::future<std::list<T>> new_lower(std::async(&parallel_quick_sort<T>, std::move(lower_part)));
  23. auto new_higher(
  24. parallel_quick_sort(std::move(input)));
  25. result.splice(result.end(), new_higher);
  26. result.splice(result.begin(), new_lower.get());
  27. return result;
  28. }
  29. int main(void){
  30. std::list<int> lst{9, 12, 1, 6, 2, 16};
  31. auto tmp = parallel_quick_sort(lst);
  32. for (auto i : tmp)
  33. cout << i << ' ';
  34. std::cout << std::endl;
  35. return 0;
  36. }