C++ 定时器

定时器中主要的数据结构

  • 优先级任务队列:队列中存储任务,每个任务会添加时间戳,最近的时间戳的任务会先出队。
  • 锁和条件变量:当有任务需要执行时,用于通知正在等待的线程从任务队列中取出任务执行。
  • 线程池:各个任务会放在线程池中执行。

下面是相关代码:

  1. class TimerQueue {
  2. public:
  3. struct InternalS {
  4. std::chrono::time_point<std::chrono::high_resolution_clock> time_point_;
  5. std::function<void()> func_;
  6. bool operator<(const InternalS& b) const { return time_point_ > b.time_point_; }
  7. };
  8. enum class RepeatedIdState { kInit = 0, kRunning = 1, kStop = 2 };
  9. private:
  10. std::priority_queue<InternalS> queue_;
  11. bool running_ = false;
  12. std::mutex mutex_;
  13. std::condition_variable cond_;
  14. wzq::ThreadPool thread_pool_;
  15. std::atomic<int> repeated_func_id_;
  16. wzq::ThreadSafeMap<int, RepeatedIdState> repeated_id_state_map_;
  17. };

初始化

在构造函数中初始化,主要是配置好内部的线程池,线程池中常驻的线程数目前设为4。

  1. TimerQueue() : running_(true), thread_pool_(wzq::ThreadPool::ThreadPoolConfig{4, 4, 40, std::chrono::seconds(4)}) {
  2. repeated_func_id_.store(0);
  3. }

开启定时器功能

打开内部的线程池功能,用于执行放入定时器中的任务,同时新开一个线程,循环等待任务到来后送入线程池中执行。

  1. bool Run() {
  2. bool ret = thread_pool_.Start();
  3. if (!ret) {
  4. return false;
  5. }
  6. std::thread([this]() { RunLocal(); }).detach();
  7. return true;
  8. }
  9. void RunLocal() {
  10. while (running_) {
  11. std::unique_lock<std::mutex> lock(mutex_);
  12. if (queue_.empty()) {
  13. cond_.wait(lock);
  14. continue;
  15. }
  16. auto s = queue_.top();
  17. auto diff = s.time_point_ - std::chrono::high_resolution_clock::now();
  18. if (std::chrono::duration_cast<std::chrono::milliseconds>(diff).count() > 0) {
  19. cond_.wait_for(lock, diff);
  20. continue;
  21. } else {
  22. queue_.pop();
  23. lock.unlock();
  24. thread_pool_.Run(std::move(s.func_));
  25. }
  26. }
  27. }

关闭定时器功能

这里是使用running_标志位控制,标志位为false,调度线程的循环就会自动退出,就不会继续等待任务执行。

  1. void Stop() {
  2. running_ = false;
  3. cond_.notify_all();
  4. }

在某一时间点执行任务

根据时间戳构造InternalS,放入队列中:

  1. template <typename F, typename... Args>
  2. void AddFuncAtTimePoint(const std::chrono::time_point<std::chrono::high_resolution_clock>& time_point, F&& f,
  3. Args&&... args) {
  4. InternalS s;
  5. s.time_point_ = time_point;
  6. s.func_ = std::bind(std::forward<F>(f), std::forward<Args>(args)...);
  7. std::unique_lock<std::mutex> lock(mutex_);
  8. queue_.push(s);
  9. cond_.notify_all();
  10. }

在某段时间后执行任务

根据当前时间加上时间段构造出时间戳从而构造InternalS,放入队列中:

  1. template <typename R, typename P, typename F, typename... Args>
  2. void AddFuncAfterDuration(const std::chrono::duration<R, P>& time, F&& f, Args&&... args) {
  3. InternalS s;
  4. s.time_point_ = std::chrono::high_resolution_clock::now() + time;
  5. s.func_ = std::bind(std::forward<F>(f), std::forward<Args>(args)...);
  6. std::unique_lock<std::mutex> lock(mutex_);
  7. queue_.push(s);
  8. cond_.notify_all();
  9. }

循环执行任务

首先为这个循环任务生成标识ID,外部可以通过ID来取消此任务继续执行,代码如下,内部以类似递归的方式循环执行任务。

  1. template <typename R, typename P, typename F, typename... Args>
  2. int AddRepeatedFunc(int repeat_num, const std::chrono::duration<R, P>& time, F&& f, Args&&... args) {
  3. int id = GetNextRepeatedFuncId();
  4. repeated_id_state_map_.Emplace(id, RepeatedIdState::kRunning);
  5. auto tem_func = std::bind(std::forward<F>(f), std::forward<Args>(args)...);
  6. AddRepeatedFuncLocal(repeat_num - 1, time, id, std::move(tem_func));
  7. return id;
  8. }
  9. int GetNextRepeatedFuncId() { return repeated_func_id_++; }
  10. template <typename R, typename P, typename F>
  11. void AddRepeatedFuncLocal(int repeat_num, const std::chrono::duration<R, P>& time, int id, F&& f) {
  12. if (!this->repeated_id_state_map_.IsKeyExist(id)) {
  13. return;
  14. }
  15. InternalS s;
  16. s.time_point_ = std::chrono::high_resolution_clock::now() + time;
  17. auto tem_func = std::move(f);
  18. s.repeated_id = id;
  19. s.func_ = [this, &tem_func, repeat_num, time, id]() {
  20. tem_func();
  21. if (!this->repeated_id_state_map_.IsKeyExist(id) || repeat_num == 0) {
  22. return;
  23. }
  24. AddRepeatedFuncLocal(repeat_num - 1, time, id, std::move(tem_func));
  25. };
  26. std::unique_lock<std::mutex> lock(mutex_);
  27. queue_.push(s);
  28. lock.unlock();
  29. cond_.notify_all();
  30. }

取消循环任务的执行

定时器内部有repeated_id_state_map 数据结构,用于存储循环任务的ID,当取消任务执行时,将此ID从repeatedid_state_map中移除,循环任务就会自动取消。

  1. void CancelRepeatedFuncId(int func_id) { repeated_id_state_map_.EraseKey(func_id); }

简单的测试代码

  1. void TestTimerQueue() {
  2. TimerQueue q;
  3. q.Run();
  4. for (int i = 5; i < 15; ++i) {
  5. q.AddFuncAfterDuration(std::chrono::seconds(i + 1), [i]() { std::cout << "this is " << i << std::endl; });
  6. q.AddFuncAtTimePoint(std::chrono::high_resolution_clock::now() + std::chrono::seconds(1),
  7. [i]() { std::cout << "this is " << i << " at " << std::endl; });
  8. }
  9. int id = q.AddRepeatedFunc(10, std::chrono::seconds(1), []() { std::cout << "func " << std::endl; });
  10. std::this_thread::sleep_for(std::chrono::seconds(4));
  11. q.CancelRepeatedFuncId(id);
  12. std::this_thread::sleep_for(std::chrono::seconds(30));
  13. q.Stop();
  14. }