C++ 线程如何优雅退出(执行清理操作)
多线程程序中, 经常会定时执行任务. 通常的做法是, 在 while 循环中执行一个 task, 然后 sleep 一段时间. 如下:
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051 | #include |
---|---|
这段程序用 sleep 或 nanosleep 作为时间间隔, 并监听 SIGINT(ctrl + c) 和 SIGTERM(kill ) 两个信号. 但有以下几个问题:
- sleep 和 nanosleep 无法被唤醒, 所以程序再接收到 SIGINT 或 SIGTERM 之后必须等待sleep 正常返回才能执行后面的 cleanup 代码.
- 如果不监听 SIGINT 或 SIGTERM, 那么系统会执行默认的 handler, 并终止程序, 这会 interrupt sleep 函数, 但也会导致 cleanup 代码被跳过.
- sleep 是linux 系统调用(包含在 unistd.h 中), 其实现与平台相关的, 所以可移植性不好. 所以有些实现是用可移植的 select 函数代替 sleep. 但这同样会面临无法被唤醒的问题.
更好的实现方式是利用C++11 中的 mutex 和 condition_variable. 利用condition_variable::wait_for 实现可 interruptible 的 sleep 功能. 正常情况下 wait_for 超时, 接收到退出信号之后, 程序会立即被唤醒, 退出 while 循环, 并执行 cleanup 代码.
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556 | #include class InterruptibleSleeper {public: // returns false if killed: template void interrupt() { std::unique_lock InterruptibleSleeper sleeper; void task() { while(sleeper.wait_for(std::chrono::milliseconds(30000))) { std::cout << “working” << std::endl; } std::cout << “cleanup” << std::endl;} void handler(int sig) { std::cout << “got signal:” << sig << std::endl; // 执行 interrupt 之后, 休眠中的线程会被唤醒, 并执行 cleanup 操作 sleeper.interrupt();} int main() { // 谨慎使用 signal, 尽量使用 sigaction struct sigaction sa; sa.sa_handler = handler; sigemptyset(&sa.sa_mask); sa.sa_flags = 0; sigaction(SIGTERM, &sa, NULL); sigaction(SIGINT, &sa, NULL); std::thread t(task); t.join(); std::cout << “thread exit” << std::endl; return 0;} |
---|---|
Reference :
std::lock_guard or std::unique_lock
https://github.com/forhappy/Cplusplus-Concurrency-In-Practice/blob/master/zh/chapter4-Mutex/4.3%20Lock-tutorial.md
std::lock_guard
#include <iostream>
#include <sstream>
#include <map>
#include <string>
#include <chrono>
#include <thread>
#include <mutex>
#include <vector>
std::map<std::string, std::string> g_pages; // not thread-safety
std::mutex g_pages_mutex;
void save_page(const std::string &url)
{
// simulate a long page fetch
std::this_thread::sleep_for(std::chrono::milliseconds(10));
std::string result = "fake content";
std::lock_guard<std::mutex> guard(g_pages_mutex);
// std::lock_guard implements below lock & unlock
// g_pages_mutex.lock();
std::cout << "save page for " << url << std::endl;
g_pages[url] = result;
// g_pages_mutex.unlock();
}
int main()
{
std::vector<std::thread> thread_pool;
for (int i = 0; i < 10; i++) {
std::stringstream ss;
std::string url;
ss << "url:" << i << std::endl;
ss >> url;
thread_pool.emplace_back(save_page, url);
//thread_pool[i].detach();
}
std::this_thread::sleep_for(std::chrono::seconds(2));
// safe to access g_pages without lock now, as the threads are joined
for (const auto &pair : g_pages) {
std::cout << pair.first << " => " << pair.second << '\n';
}
// If exit now program will crash
// cleanup threads manually, otherwise detach
for(std::thread& t : thread_pool) {
t.join();
}
}
std::unique_lock
#include <iostream> // std::cout
#include <thread> // std::thread
#include <mutex> // std::mutex, std::lock, std::unique_lock
// std::adopt_lock, std::defer_lock
std::mutex foo,bar;
void task_a () {
std::lock (foo,bar); // simultaneous lock (prevents deadlock)
std::unique_lock<std::mutex> lck1 (foo,std::adopt_lock);
std::unique_lock<std::mutex> lck2 (bar,std::adopt_lock);
std::cout << "task a\n";
// (unlocked automatically on destruction of lck1 and lck2)
}
void task_b () {
// foo.lock(); bar.lock(); // replaced by:
std::unique_lock<std::mutex> lck1, lck2;
lck1 = std::unique_lock<std::mutex>(bar,std::defer_lock);
lck2 = std::unique_lock<std::mutex>(foo,std::defer_lock);
std::lock (lck1,lck2); // simultaneous lock (prevents deadlock)
std::cout << "task b\n";
// (unlocked automatically on destruction of lck1 and lck2)
}
int main ()
{
std::thread th1 (task_a);
std::thread th2 (task_b);
th1.join();
th2.join();
return 0;
}
std::ref
在函数绑定的情况下可以用到。
void f(int& n1, int& n2, const int& n3)
{
std::cout << "In function: " << n1 << ' ' << n2 << ' ' << n3 << '\n';
++n1; // increments the copy of n1 stored in the function object
++n2; // increments the main()'s n2
// ++n3; // compile error
}
int main()
{
int n1 = 1, n2 = 2, n3 = 3;
std::function<void()> bound_f = std::bind(f, n1, std::ref(n2), std::cref(n3));
n1 = 10;
n2 = 11;
n3 = 12;
std::cout << "Before function: " << n1 << ' ' << n2 << ' ' << n3 << '\n';
bound_f();
std::cout << "After function: " << n1 << ' ' << n2 << ' ' << n3 << '\n';
}
std::thread
void worker_thread(void) {
int i = 10;
while (i--) {
std::cout << "Thread 2 executing\n";
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
}
void f1(int n)
{
for (int i = 0; i < 5; ++i) {
std::cout << "Thread 3 executing\n";
++n;
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
}
void f2(int& n)
{
for (int i = 0; i < 5; ++i) {
std::cout << "Thread 4 executing\n";
++n;
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
}
class foo
{
public:
void bar()
{
for (int i = 0; i < 5; ++i) {
std::cout << "Thread 5 executing\n";
++n;
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
}
int n = 0;
};
class baz
{
public:
void operator()()
{
for (int i = 0; i < 5; ++i) {
std::cout << "Thread 6 executing\n";
++n;
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
}
int n = 0;
};
int main(int argc, const char * argv[]) {
int n = 0;
foo f;
baz b;
std::thread t1; //t1 is not a thread
std::thread t2(worker_thread);
std::thread t3(f1, n);
std::thread t4(f2, std::ref(n));
std::thread t5(std::move(t3)); //t5 is now running f2(). t3 is no longer a thread
std::thread t6(&foo::bar, &f);
std::thread t7(b);
// t2.join();
// t4.join();
// t5.join();
// t6.join();
// t7.join();
// simple runloop
while (true) {
std::this_thread::sleep_for(std::chrono::seconds(1));
std::cout << "Final value of n is " << n << '\n';
}
return 0;
}
上面的代码,std::thread t4(f2, std::ref(n)); 如果改为std::thread t4(f2, n); 会报错
当join()一个没有绑定函数的thread对象,程序会崩溃
如果不调用join,父线程退出了,也会造成崩溃。