C++ 线程如何优雅退出(执行清理操作)

多线程程序中, 经常会定时执行任务. 通常的做法是, 在 while 循环中执行一个 task, 然后 sleep 一段时间. 如下:

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051 #include #include #include #include #include bool is_stopped = false;void nanotask() { struct timespec interval; interval.tv_sec = 10; interval.tv_nsec = 0; while(!is_stopped) { std::cout << “nanosleeping” << std::endl; nanosleep(&interval, NULL); std::cout << “wake up” << std::endl; }}void task() { while(!is_stopped) { std::cout << “sleeping” << std::endl; // sleep 底层实现还是 nanosleep sleep(10); std::cout << “wake up” << std::endl; } std::cout << “cleanup” << std::endl;}void handler(int sig) { std::cout << “got signal:” << sig << std::endl; // 虽然设置了is_stopped = true, 但是要等到 sleep 返回, 程序才能结束 is_stopped = true;}int main() { // 谨慎使用 signal, 尽量使用 sigaction struct sigaction sa; sa.sa_handler = handler; sigemptyset(&sa.sa_mask); sa.sa_flags = 0; sigaction(SIGTERM, &sa, NULL); sigaction(SIGINT, &sa, NULL); std::thread t1(task); std::thread t2(nanotask); t1.join(); t2.join(); std::cout << “thread exit” << std::endl; return 0;}

这段程序用 sleep 或 nanosleep 作为时间间隔, 并监听 SIGINT(ctrl + c) 和 SIGTERM(kill ) 两个信号. 但有以下几个问题:

  1. sleep 和 nanosleep 无法被唤醒, 所以程序再接收到 SIGINT 或 SIGTERM 之后必须等待sleep 正常返回才能执行后面的 cleanup 代码.
  2. 如果不监听 SIGINT 或 SIGTERM, 那么系统会执行默认的 handler, 并终止程序, 这会 interrupt sleep 函数, 但也会导致 cleanup 代码被跳过.
  3. sleep 是linux 系统调用(包含在 unistd.h 中), 其实现与平台相关的, 所以可移植性不好. 所以有些实现是用可移植的 select 函数代替 sleep. 但这同样会面临无法被唤醒的问题.

更好的实现方式是利用C++11 中的 mutex 和 condition_variable. 利用condition_variable::wait_for 实现可 interruptible 的 sleep 功能. 正常情况下 wait_for 超时, 接收到退出信号之后, 程序会立即被唤醒, 退出 while 循环, 并执行 cleanup 代码.

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556 #include #include #include #include #include #include
class InterruptibleSleeper {public: // returns false if killed: template bool wait_for(std::chrono::duration const& time ) { std::unique_lock lock(m); return !cv.wait_for(lock, time, [&]{return terminate;}); }
void interrupt() { std::unique_lock lock(m); terminate = true; cv.notify_all(); }private: std::condition_variable cv; std::mutex m; bool terminate = false;};
InterruptibleSleeper sleeper;
void task() { while(sleeper.wait_for(std::chrono::milliseconds(30000))) { std::cout << “working” << std::endl; } std::cout << “cleanup” << std::endl;}
void handler(int sig) { std::cout << “got signal:” << sig << std::endl; // 执行 interrupt 之后, 休眠中的线程会被唤醒, 并执行 cleanup 操作 sleeper.interrupt();}
int main() { // 谨慎使用 signal, 尽量使用 sigaction struct sigaction sa; sa.sa_handler = handler; sigemptyset(&sa.sa_mask); sa.sa_flags = 0; sigaction(SIGTERM, &sa, NULL); sigaction(SIGINT, &sa, NULL);
std::thread t(task); t.join(); std::cout << “thread exit” << std::endl; return 0;}

Reference :


std::lock_guard or std::unique_lock
https://github.com/forhappy/Cplusplus-Concurrency-In-Practice/blob/master/zh/chapter4-Mutex/4.3%20Lock-tutorial.md

std::lock_guard

  1. #include <iostream>
  2. #include <sstream>
  3. #include <map>
  4. #include <string>
  5. #include <chrono>
  6. #include <thread>
  7. #include <mutex>
  8. #include <vector>
  9. std::map<std::string, std::string> g_pages; // not thread-safety
  10. std::mutex g_pages_mutex;
  11. void save_page(const std::string &url)
  12. {
  13. // simulate a long page fetch
  14. std::this_thread::sleep_for(std::chrono::milliseconds(10));
  15. std::string result = "fake content";
  16. std::lock_guard<std::mutex> guard(g_pages_mutex);
  17. // std::lock_guard implements below lock & unlock
  18. // g_pages_mutex.lock();
  19. std::cout << "save page for " << url << std::endl;
  20. g_pages[url] = result;
  21. // g_pages_mutex.unlock();
  22. }
  23. int main()
  24. {
  25. std::vector<std::thread> thread_pool;
  26. for (int i = 0; i < 10; i++) {
  27. std::stringstream ss;
  28. std::string url;
  29. ss << "url:" << i << std::endl;
  30. ss >> url;
  31. thread_pool.emplace_back(save_page, url);
  32. //thread_pool[i].detach();
  33. }
  34. std::this_thread::sleep_for(std::chrono::seconds(2));
  35. // safe to access g_pages without lock now, as the threads are joined
  36. for (const auto &pair : g_pages) {
  37. std::cout << pair.first << " => " << pair.second << '\n';
  38. }
  39. // If exit now program will crash
  40. // cleanup threads manually, otherwise detach
  41. for(std::thread& t : thread_pool) {
  42. t.join();
  43. }
  44. }

std::unique_lock

  1. #include <iostream> // std::cout
  2. #include <thread> // std::thread
  3. #include <mutex> // std::mutex, std::lock, std::unique_lock
  4. // std::adopt_lock, std::defer_lock
  5. std::mutex foo,bar;
  6. void task_a () {
  7. std::lock (foo,bar); // simultaneous lock (prevents deadlock)
  8. std::unique_lock<std::mutex> lck1 (foo,std::adopt_lock);
  9. std::unique_lock<std::mutex> lck2 (bar,std::adopt_lock);
  10. std::cout << "task a\n";
  11. // (unlocked automatically on destruction of lck1 and lck2)
  12. }
  13. void task_b () {
  14. // foo.lock(); bar.lock(); // replaced by:
  15. std::unique_lock<std::mutex> lck1, lck2;
  16. lck1 = std::unique_lock<std::mutex>(bar,std::defer_lock);
  17. lck2 = std::unique_lock<std::mutex>(foo,std::defer_lock);
  18. std::lock (lck1,lck2); // simultaneous lock (prevents deadlock)
  19. std::cout << "task b\n";
  20. // (unlocked automatically on destruction of lck1 and lck2)
  21. }
  22. int main ()
  23. {
  24. std::thread th1 (task_a);
  25. std::thread th2 (task_b);
  26. th1.join();
  27. th2.join();
  28. return 0;
  29. }

std::ref
在函数绑定的情况下可以用到。

  1. void f(int& n1, int& n2, const int& n3)
  2. {
  3. std::cout << "In function: " << n1 << ' ' << n2 << ' ' << n3 << '\n';
  4. ++n1; // increments the copy of n1 stored in the function object
  5. ++n2; // increments the main()'s n2
  6. // ++n3; // compile error
  7. }
  8. int main()
  9. {
  10. int n1 = 1, n2 = 2, n3 = 3;
  11. std::function<void()> bound_f = std::bind(f, n1, std::ref(n2), std::cref(n3));
  12. n1 = 10;
  13. n2 = 11;
  14. n3 = 12;
  15. std::cout << "Before function: " << n1 << ' ' << n2 << ' ' << n3 << '\n';
  16. bound_f();
  17. std::cout << "After function: " << n1 << ' ' << n2 << ' ' << n3 << '\n';
  18. }

std::thread

  1. void worker_thread(void) {
  2. int i = 10;
  3. while (i--) {
  4. std::cout << "Thread 2 executing\n";
  5. std::this_thread::sleep_for(std::chrono::milliseconds(10));
  6. }
  7. }
  8. void f1(int n)
  9. {
  10. for (int i = 0; i < 5; ++i) {
  11. std::cout << "Thread 3 executing\n";
  12. ++n;
  13. std::this_thread::sleep_for(std::chrono::milliseconds(10));
  14. }
  15. }
  16. void f2(int& n)
  17. {
  18. for (int i = 0; i < 5; ++i) {
  19. std::cout << "Thread 4 executing\n";
  20. ++n;
  21. std::this_thread::sleep_for(std::chrono::milliseconds(10));
  22. }
  23. }
  24. class foo
  25. {
  26. public:
  27. void bar()
  28. {
  29. for (int i = 0; i < 5; ++i) {
  30. std::cout << "Thread 5 executing\n";
  31. ++n;
  32. std::this_thread::sleep_for(std::chrono::milliseconds(10));
  33. }
  34. }
  35. int n = 0;
  36. };
  37. class baz
  38. {
  39. public:
  40. void operator()()
  41. {
  42. for (int i = 0; i < 5; ++i) {
  43. std::cout << "Thread 6 executing\n";
  44. ++n;
  45. std::this_thread::sleep_for(std::chrono::milliseconds(10));
  46. }
  47. }
  48. int n = 0;
  49. };
  50. int main(int argc, const char * argv[]) {
  51. int n = 0;
  52. foo f;
  53. baz b;
  54. std::thread t1; //t1 is not a thread
  55. std::thread t2(worker_thread);
  56. std::thread t3(f1, n);
  57. std::thread t4(f2, std::ref(n));
  58. std::thread t5(std::move(t3)); //t5 is now running f2(). t3 is no longer a thread
  59. std::thread t6(&foo::bar, &f);
  60. std::thread t7(b);
  61. // t2.join();
  62. // t4.join();
  63. // t5.join();
  64. // t6.join();
  65. // t7.join();
  66. // simple runloop
  67. while (true) {
  68. std::this_thread::sleep_for(std::chrono::seconds(1));
  69. std::cout << "Final value of n is " << n << '\n';
  70. }
  71. return 0;
  72. }

上面的代码,std::thread t4(f2, std::ref(n)); 如果改为std::thread t4(f2, n); 会报错
Screen Shot 2019-11-07 at 12.53.39 PM.png

当join()一个没有绑定函数的thread对象,程序会崩溃
Screen Shot 2019-11-07 at 12.59.31 PM.png

如果不调用join,父线程退出了,也会造成崩溃。
Screen Shot 2019-11-07 at 1.10.29 PM.png