1. 协程类封装

概念:协程是用户态的线程,仅仅完成代码序列的切换而不陷入内核态发生内核资源的切换。

基本原理:采用非对称协程开发。所有子协程的调度和创建依赖于一个母协程init_coroutine,即:一个线程需要启动协程时,首先会创建出一个默认的母协程。通过母协程让出当前代码序列在CPU的执行权,转让到子协程对应的代码序列中继续执行,子协程完毕后让出执行权又回到母协程中。

协程开发  (一) - 图1

  • 切换逻辑: 协程开发  (一) - 图2

    准备:依赖的库文件**<ucontext.h>**

  • 切换程序上下文API:

    1. //依赖实现的结构体
    2. typedef struct ucontext {
    3. struct ucontext *uc_link; //后续的程序上下文
    4. sigset_t uc_sigmask; //当前上下文中的阻塞信号集合
    5. stack_t uc_stack; //当前上下文依赖的栈空间信息集合
    6. mcontext_t uc_mcontext;
    7. ...
    8. } ucontext_t;
  • int getcontext(ucontext_t *ucp); 保存当前程序上下文。

  • int setcontext(const ucontext_t *ucp); 将保存的程序上下文拿出推送到CPU上
  • void makecontext(ucontext_t *ucp, void (*func)(), int argc, ...); 创建新上下文
  • int swapcontext(ucontext_t *oucp, ucontext_t *ucp); 切换当前上下文 oucp——>ucp

**setcontext/swapcontext**会激活保存好的程序上下文去执行,也就意味着必须先调用**getcontext**

注意:使用makecontext时,必须先调用一次getcontext(个人猜测是因为,重新创建一个上下文使用除了要传函数的入口地址,其余的CPU寄存器值也要保存,getcontext可以代劳),然后为ucontext_t设置新的栈空间传入:uc_stack.ss_sp栈起始地址,uc_stack.ss_size栈空间大小、后续需要运行的上下文uc_link

1.1 成员变量

  1. /**
  2. * @brief 协程类
  3. */
  4. class Coroutine: public std::enable_shared_from_this<Coroutine>
  5. {
  6. ...
  7. ...
  8. public:
  9. /**
  10. * @brief 协程运行状态枚举类型
  11. */
  12. enum State
  13. {
  14. INIT, //初始状态
  15. HOLD, //挂起状态
  16. EXEC, //运行状态
  17. TERM, //终止状态
  18. READY, //就绪状态
  19. EXCEPTION //异常状态
  20. };
  21. private:
  22. ///协程ID
  23. uint64_t m_id = 0;
  24. ///用户栈大小
  25. uint32_t m_stack_size = 0;
  26. ///协程状态
  27. State m_state = INIT;
  28. ///协程携带的程序上下文
  29. ucontext_t m_ctx;
  30. ///用户栈起始
  31. void *m_stack = nullptr;
  32. ///协程执行的回调函数
  33. std::function<void()> m_cb;
  34. };

** 配置项

1). 记录协程信息的线程局部变量

  1. /**
  2. * @brief 协程ID累加器
  3. */
  4. static std::atomic<uint64_t> s_cor_id(0);
  5. /**
  6. * @brief 当前线程下存在协程的总数
  7. */
  8. static std::atomic<uint64_t> s_cor_sum(0);
  9. /**
  10. * @brief 当前线程下正在运行协程
  11. */
  12. static thread_local Coroutine* cor_this = nullptr;
  13. /**
  14. * @brief 上一次切出的协程
  15. */
  16. static thread_local Coroutine::ptr init_cor_sp = nullptr;

2). 协程栈信息

  1. /**
  2. * @brief 配置项 每个协程的栈默认大小为1MB
  3. */
  4. static ConfigVar<uint32_t>::ptr g_cor_stack_size =
  5. Config::LookUp("coroutine.stack_size", (uint32_t)1024*1024, "coroutine stack size");
  6. /**
  7. * @brief 协程栈内存分配器类
  8. */
  9. class MallocStackAllocator
  10. {
  11. public:
  12. /**
  13. * @brief 分配内存
  14. * @param[in] size 所需内存大小
  15. * @return void*
  16. */
  17. static void* Alloc(size_t size)
  18. {
  19. return malloc(size);
  20. }
  21. /**
  22. * @brief 释放内存
  23. * @param[in] vp 栈空间指针
  24. * @param[in] size 栈空间大小
  25. */
  26. static void Dealloc(void *vp, size_t size)
  27. {
  28. free(vp);
  29. }
  30. };
  31. //使用using起别名
  32. using StackAllocator = MallocStackAllocator;

1.2 接口

1.2.1 构造函数

  1. /**
  2. * @brief 协程类构造函数
  3. * @param[in] cb 指定的执行函数
  4. * @param[in] stack_size 协程栈空间大小
  5. * @param[in] use_call 是否作为调度协程使用
  6. */
  7. Coroutine(std::function<void()> cb, size_t stack_size = 0, bool use_call = false);
  8. Coroutine::Coroutine(std::function<void()> cb, size_t stack_size, bool use_call)
  9. :m_id(++s_cor_id), m_cb(cb)
  10. {
  11. ++s_cor_sum;
  12. //为协程分配栈空间 让回调函数在对应栈空间去运行
  13. m_stack_size = stack_size ? stack_size : g_cor_stack_size->getValue();
  14. m_stack = StackAllocator::Alloc(m_stack_size);
  15. if(getcontext(&m_ctx) < 0)
  16. {
  17. KIT_LOG_ERROR(g_logger) << "Cortione: getcontext error";
  18. KIT_ASSERT2(false, "getcontext error");
  19. }
  20. //指定代码序列执行完毕后 自动跳转到的指定地方
  21. m_ctx.uc_link = nullptr;
  22. m_ctx.uc_stack.ss_sp = m_stack;
  23. m_ctx.uc_stack.ss_size = m_stack_size;
  24. //use_call 标识当前的协程是否是调度协程
  25. if(!use_call)
  26. //指定要运行的代码序列
  27. makecontext(&m_ctx, &Coroutine::MainFunc, 0);
  28. else
  29. makecontext(&m_ctx, &Coroutine::CallMainFunc, 0);
  30. KIT_LOG_DEBUG(g_logger) << "协程构造:" << m_id;
  31. }

** 私有的默认构造函数

功能:为init协程的生成而准备,生成线程下的第一个协程。

  1. /**
  2. * @brief 协程类默认构造函数 负责生成init协程
  3. */
  4. Coroutine();
  5. Coroutine::Coroutine()
  6. {
  7. m_state = State::EXEC;
  8. SetThis(this);
  9. if(getcontext(&m_ctx) < 0)
  10. {
  11. KIT_LOG_ERROR(g_logger) << "Cortione: getcontext error";
  12. KIT_ASSERT2(false, "getcontext error");
  13. }
  14. ++s_cor_sum;
  15. }

1.2.2 析构函数

  1. /**
  2. * @brief 协程类析构函数
  3. */
  4. ~Coroutine();
  5. Coroutine::~Coroutine()
  6. {
  7. --s_cor_sum;
  8. if(m_stack)
  9. {
  10. //只要不是运行态 或者 挂起就释放栈空间
  11. KIT_ASSERT(m_state != State::EXEC || m_state != State::HOLD);
  12. //释放栈空间
  13. StackAllocator::Dealloc(m_stack, m_stack_size);
  14. KIT_LOG_DEBUG(g_logger) << "协程析构:" << m_id;
  15. }
  16. else //没有栈是主协程
  17. {
  18. KIT_ASSERT(!m_cb);
  19. KIT_ASSERT(m_state == State::EXEC);
  20. Coroutine* cur = cor_this;
  21. if(cur == this)
  22. SetThis(nullptr);
  23. }
  24. }

**私有接口

MainFunc()/CallMainFunc()

  1. /**
  2. * @brief 一般协程的回调主函数
  3. */
  4. static void MainFunc();
  5. void Coroutine::MainFunc()
  6. {
  7. Coroutine::ptr cur = GetThis();
  8. KIT_ASSERT(cur);
  9. try
  10. {
  11. cur->m_cb();
  12. cur->m_cb = nullptr;
  13. cur->m_state = State::TERM; //协程已经执行完毕。
  14. }
  15. catch(const std::exception &e)
  16. {
  17. cur->m_state = State::EXCEPTION;
  18. KIT_LOG_ERROR(g_logger) << "Coroutine: MainFunc exception:" << e.what()
  19. << std::endl
  20. << BackTraceToString();
  21. }
  22. catch(...)
  23. {
  24. cur->m_state = State::EXCEPTION;
  25. KIT_LOG_ERROR(g_logger) << "Coroutine: MainFunc exception:" << ",but dont konw reason"
  26. << std::endl
  27. << BackTraceToString();
  28. }
  29. auto p = cur.get();
  30. cur.reset(); //让其减少一次该函数调用中应该减少的引用次数
  31. p->swapOut();
  32. //不会再回到这个地方 回来了说明有问题
  33. KIT_ASSERT2(false, "never reach here!");
  34. }
  35. /**
  36. * @brief 持有调度器协程的回调主函数
  37. */
  38. static void CallMainFunc();
  39. void Coroutine::CallMainFunc()
  40. {
  41. Coroutine::ptr cur = GetThis();
  42. KIT_ASSERT(cur);
  43. try
  44. {
  45. cur->m_cb();
  46. cur->m_cb = nullptr;
  47. cur->m_state = State::TERM; //协程已经执行完毕。
  48. }
  49. catch(const std::exception &e)
  50. {
  51. cur->m_state = State::EXCEPTION;
  52. KIT_LOG_ERROR(g_logger) << "Coroutine: MainFunc exception:" << e.what()
  53. << std::endl
  54. << BackTraceToString();
  55. }
  56. catch(...)
  57. {
  58. cur->m_state = State::EXCEPTION;
  59. KIT_LOG_ERROR(g_logger) << "Coroutine: MainFunc exception:" << ",but dont konw reson"
  60. << std::endl
  61. << BackTraceToString();
  62. }
  63. auto p = cur.get();
  64. cur.reset(); //让其减少一次该函数调用中应该减少的引用次数
  65. p->back();
  66. //不会再回到这个地方 回来了说明有问题
  67. KIT_ASSERT2(false, "never reach here!");
  68. }

1.2.3 程序上下文切换接口 (核心)

核心函数:利用swapcontext(参数1, 参数2),将目标代码段(参数2)推送到CPU上执行,将当前执行的代码段保存起来(保存到参数1)

1). swapIn()

母协程init——————————>子协程

  1. /**
  2. * @brief init------>子协程
  3. */
  4. void swapIn();
  5. void Coroutine::swapIn()
  6. {
  7. //将当前的子协程Coroutine * 设置到 cor_this中 表明是这个协程正在运行
  8. SetThis(this);
  9. //没在运行态才能 调入运行
  10. KIT_ASSERT(m_state == State::INIT || m_state == State::HOLD);
  11. m_state = State::EXEC;
  12. if(swapcontext(&init_cor_sp->m_ctx, &m_ctx) < 0)
  13. {
  14. KIT_LOG_ERROR(g_logger) << "swapIn: swapcontext error";
  15. KIT_ASSERT2(false, "swapcontext error");
  16. }
  17. }

2). swapOut()

子协程——————————>母协程init

  1. /**
  2. * @brief 子协程------>init
  3. */
  4. void swapOut();
  5. void Coroutine::swapOut()
  6. {
  7. //将母协程init_cor_sp 设置到 cor_this中 表明是这个协程正在运行
  8. SetThis(init_cor_sp.get());
  9. if(swapcontext(&m_ctx, &Scheduler::GetMainCor()->m_ctx) < 0)
  10. {
  11. KIT_LOG_ERROR(g_logger) << "swapOut: swapcontext error";
  12. KIT_ASSERT2(false, "swapcontext error");
  13. }
  14. }

BUG修正1:测试时发生bad_function_call的异常抛出,重新修改调度逻辑

原因:由于要将主线程也加入到工作队列中,从主线程出发的协程切换出去到切换回来的地方不是对应应该切回去的地方,如图: 协程开发  (一) - 图3

  • 逻辑错误点:在Coroutine::MainFunc()中的swapOut()究竟切回到哪个地方,逻辑混乱了。

image.png

  • swapOut()修改如下:

    1. void Coroutine::swapOut()
    2. {
    3. //如果当前不在调度协程上执行代码 说明是从调度协程切过来的 要切回调度协程
    4. if(this != Scheduler::GetMainCor())
    5. {
    6. SetThis(Scheduler::GetMainCor());
    7. if(swapcontext(&m_ctx, &Scheduler::GetMainCor()->m_ctx) < 0)
    8. {
    9. KIT_LOG_ERROR(g_logger) << "swapOut: swapcontext error";
    10. KIT_ASSERT2(false, "swapcontext error");
    11. }
    12. }
    13. else //如果当前在调度协程上执行代码 说明是从init协程切过来的 要切回init协程
    14. {
    15. SetThis(init_cor_sp.get());
    16. if(swapcontext(&m_ctx, &init_cor_sp->m_ctx) < 0)
    17. {
    18. KIT_LOG_ERROR(g_logger) << "swapOut: swapcontext error";
    19. KIT_ASSERT2(false, "swapcontext error");
    20. }
    21. }
    22. }

    BUG修正2:上述的调度逻辑错误源于从init协程切出还是从调度协程切出

    于是将从不同两个协程出发的切换函数单独的封装,因为来自init协程的切换较少,来自调度协程的切换较多。让一组函数负责init协程的切换;另外一组函数负责调度协程的切换

1.2.3.1call()/back()负责init协程

  1. //从init协程 切换到 目标代码
  2. void Coroutine::call()
  3. {
  4. SetThis(this);
  5. m_state = State::EXEC;
  6. //应该是把当前创建调度器的那个协程的上下文拿出来运行
  7. //if(swapcontext(&init_cor_sp->m_ctx, &m_ctx) < 0)
  8. if(swapcontext(&init_cor_sp->m_ctx, &m_ctx) < 0)
  9. {
  10. KIT_LOG_ERROR(g_logger) << "call: swapcontext error";
  11. KIT_ASSERT2(false, "swapcontext error");
  12. }
  13. }
  14. // 目标代码 切换到 从init协程
  15. void Coroutine::back()
  16. {
  17. SetThis(init_cor_sp.get());
  18. if(swapcontext(&m_ctx, &init_cor_sp->m_ctx) < 0)
  19. {
  20. KIT_LOG_ERROR(g_logger) << "back: swapcontext error";
  21. KIT_ASSERT2(false, "swapcontext error");
  22. }
  23. }

1.2.3.2 swapIn()/swapOut()负责调度协程

  1. //从调度器 切换到 到目标代码序列
  2. void Coroutine::swapIn()
  3. {
  4. //将当前的子协程Coroutine * 设置到 cor_this中 表明是这个协程正在运行
  5. SetThis(this);
  6. //没在运行态才能 调入运行
  7. KIT_ASSERT(m_state == State::INIT || m_state == State::HOLD);
  8. m_state = State::EXEC;
  9. if(swapcontext(&Scheduler::GetMainCor()->m_ctx, &m_ctx) < 0)
  10. {
  11. KIT_LOG_ERROR(g_logger) << "swapIn: swapcontext error";
  12. KIT_ASSERT2(false, "swapcontext error");
  13. }
  14. }
  15. //从目标代码序列切换到调度器
  16. void Coroutine::swapOut()
  17. {
  18. SetThis(Scheduler::GetMainCor());
  19. if(swapcontext(&m_ctx, &Scheduler::GetMainCor()->m_ctx) < 0)
  20. {
  21. KIT_LOG_ERROR(g_logger) << "swapOut: swapcontext error";
  22. KIT_ASSERT2(false, "swapcontext error");
  23. }
  24. }

1.2.3.3 协程构造函数中要标识当前协程是否是调度协程use_call

调度器协程要执行CallMainFunc();普通协程要执行MainFunc()
image.png
image.png

1.2.3.4 对应的协程的回调函数MainFunc()也要封装两套,静态成员函数复用原来的代码不太方便

  • void Coroutine::MainFunc():

image.png

  • void Coroutine::CallMainFunc():

image.png

1.2.4 reset()

功能:协程重置,重新指定执行函数。协程状态回到初始状态INIT

  1. /**
  2. * @brief 协程重置 重新指定执行函数
  3. * @param[in] cb 新指定的执行函数
  4. */
  5. void reset(std::function<void()> cb);
  6. void Coroutine::reset(std::function<void()> cb)
  7. {
  8. KIT_ASSERT(m_stack);
  9. KIT_ASSERT(m_state == State::INIT || m_state == State::TERM ||
  10. m_state == State::EXCEPTION);
  11. if(getcontext(&m_ctx) < 0)
  12. {
  13. KIT_LOG_ERROR(g_logger) << "reset: getcontext error";
  14. KIT_ASSERT2(false, "getcontext error");
  15. }
  16. m_cb = cb;
  17. m_ctx.uc_link = nullptr;
  18. m_ctx.uc_stack.ss_sp = m_stack;
  19. m_ctx.uc_stack.ss_size = m_stack_size;
  20. makecontext(&m_ctx, &Coroutine::MainFunc, 0);
  21. m_state = State::INIT;
  22. }

1.2.5 Init()

功能:初始化母协程init,为线程创建第一个协程。

  1. /**
  2. * @brief 初始化母协程init
  3. */
  4. static void Init();
  5. void Coroutine::Init()
  6. {
  7. //创建母协程init
  8. Coroutine::ptr main_cor(new Coroutine);
  9. KIT_ASSERT(cor_this == main_cor.get());
  10. //这句代码很关键
  11. init_cor_sp = main_cor;
  12. }

1.2.6 YieldToReady()

功能:当前协程让出执行权,并置为就绪状态READY

  1. /**
  2. * @brief 当前协程让出执行权,并置为就绪状态READY
  3. */
  4. static void YieldToReady();
  5. void Coroutine::YieldToReady()
  6. {
  7. Coroutine::ptr cur = GetThis();
  8. KIT_ASSERT(cur->m_state == State::EXEC);
  9. cur->m_state = State::READY;
  10. cur->swapOut();
  11. }

1.2.7 YieldToHold()

功能:当前协程让出执行权,并置为挂起状态HOLD

  1. /**
  2. * @brief 当前协程让出执行权,并置为就绪状态HOLD
  3. */
  4. static void YieldToHold();
  5. void Coroutine::YieldToHold()
  6. {
  7. Coroutine::ptr cur = GetThis();
  8. KIT_ASSERT(cur->m_state == State::EXEC);
  9. cur->m_state = State::HOLD;
  10. cur->swapOut();
  11. }

1.2.8 其他常用接口

  1. /**
  2. * @brief 获取协程ID
  3. * @return uint64_t
  4. */
  5. uint64_t getID() const {return m_id;}
  6. /**
  7. * @brief 获取协程运行状态
  8. * @return State
  9. */
  10. State getState() const {return m_state;}
  11. /**
  12. * @brief 设置协程状态
  13. * @param[in] state
  14. */
  15. void setState(State state) {m_state = state;}
  16. /**
  17. * @brief 给当前线程保存正在执行的协程this指针
  18. * @param[in] c 正在运行的协程this指针
  19. */
  20. static void SetThis(Coroutine *c);
  21. void Coroutine::SetThis(Coroutine *c)
  22. {
  23. cor_this = c;
  24. }
  25. /**
  26. * @brief 返回当前在执行的协程的this智能指针
  27. * @return Coroutine::ptr
  28. */
  29. static Coroutine::ptr GetThis();
  30. Coroutine::ptr Coroutine::GetThis()
  31. {
  32. if(cor_this)
  33. {
  34. return cor_this->shared_from_this();
  35. }
  36. Init();
  37. return cor_this->shared_from_this();
  38. }
  39. /**
  40. * @brief 获取当前线程上存在的协程总数
  41. * @return uint64_t
  42. */
  43. static uint64_t TotalCoroutines();
  44. uint64_t Coroutine::TotalCoroutines()
  45. {
  46. return s_cor_sum;
  47. }
  48. /**
  49. * @brief 获取协程ID
  50. * @return uint64_t
  51. */
  52. static uint64_t GetCoroutineId();
  53. uint64_t Coroutine::GetCoroutineId()
  54. {
  55. if(cor_this)
  56. {
  57. return cor_this->getID();
  58. }
  59. return 0;
  60. }

2. 协程调度

实现目标:实现跨线程间的协程切换。让A线程中的1号协程切换到B线程中的3号协程上去执行对应的任务。即:线程间从任务队列”抢”任务,抢到任务之后以协程为目标代码载体,使用同步切换的方式到对应的目标代码上进行执行。

非常非常注意:采用ucontext的API,如果程序上下文衔接不恰当,会导致最后一个协程退出是时候,整个主线程也退出了,这是相当危险的!!!

实现调度器schedule

分配有多个线程,一个线程又有分配多个协程。 N个线程对M个协程。

  1. schedule 是一个线程池,分配一组线程。
  2. schedule 是一个协程调度器,分配协程到相应的线程去执行目标代码
    1. 方式一:协程随机选择一个空闲的任意的线程上执行
    2. 方式二:给协程指定一个线程去执行

2.1 基本的调度思路

协程开发  (一) - 图9

2.2 成员变量

  1. class Schduler
  2. {
  3. ...
  4. ...
  5. protected:
  6. /// 线程ID数组
  7. std::vector<int> m_threadIds;
  8. /// 总共线程数
  9. size_t m_threadSum;
  10. /// 活跃线程数
  11. std::atomic<size_t> m_activeThreadCount = {0};
  12. /// 空闲线程数
  13. std::atomic<size_t> m_idleThreadCount = {0};
  14. /// 主线程ID
  15. pid_t m_mainThreadId = 0;
  16. /// 正在停止运行标志
  17. bool m_stopping = true;
  18. /// 是否自动停止标志
  19. bool m_autoStop = false;
  20. private:
  21. /// 线程池 工作队列
  22. std::vector<Thread::ptr> m_threads;
  23. /// 任务队列
  24. std::list<CoroutineObject> m_coroutines;
  25. /// 互斥锁
  26. MutexType m_mutex;
  27. /// 主协程智能指针
  28. Coroutine::ptr m_mainCoroutine;
  29. // 调度器名称
  30. std::string m_name;
  31. };

*封装一个自定义的可执行对象结构体

目的:让调度器调度执行的对象不仅仅为Coroutine协程体,还可以是一个function<>可调用对象
image.png

  1. private:
  2. /**
  3. * @brief 可执行对象结构体
  4. */
  5. struct CoroutineObject
  6. {
  7. /// 协程
  8. Coroutine::ptr cor;
  9. /// 函数
  10. std::function<void()> cb;
  11. /// 指定的执行线程
  12. pid_t threadId;
  13. CoroutineObject(Coroutine::ptr p, pthread_t t)
  14. :cor(p), threadId(t){ }
  15. CoroutineObject(Coroutine::ptr* p, pthread_t t)
  16. :threadId(t)
  17. {
  18. //减少一次智能指针引用
  19. cor.swap(*p);
  20. }
  21. CoroutineObject(std::function<void()> f, pthread_t t)
  22. :cb(f), threadId(t) { }
  23. CoroutineObject(std::function<void()> *f, pthread_t t)
  24. :threadId(t)
  25. {
  26. //减少一次智能指针引用
  27. cb.swap(*f);
  28. }
  29. /*和STL结合必须有默认构造函数*/
  30. CoroutineObject()
  31. :threadId(-1){ }
  32. void reset()
  33. {
  34. cor = nullptr;
  35. cb = nullptr;
  36. threadId = -1;
  37. }
  38. };

2.3 接口

2.3.1 构造函数

  1. /**
  2. * @brief 调度器类构造函数
  3. * @param[in] name 调度器名称
  4. * @param[in] threads_size 初始线程数量
  5. * @param[in] use_caller 当前线程是否纳入调度队列 默认纳入调度
  6. */
  7. Scheduler(const std::string& name = "", size_t threads_size = 1, bool use_caller = true);
  8. Scheduler::Scheduler(const std::string& name, size_t threads_size, bool use_caller)
  9. :m_name(name.size() ? name : Thread::GetName())
  10. {
  11. KIT_ASSERT(threads_size > 0);
  12. //当前线程作为调度线程使用
  13. if(use_caller)
  14. {
  15. //初始化母协程 pre_cor_sp 被初始化
  16. Coroutine::Init();
  17. --threads_size; //减1是因为当前的这个线程也会被纳入调度 少创建一个线程
  18. //这个断言防止 该线程中重复创建调度器
  19. KIT_ASSERT(Scheduler::GetThis() == nullptr);
  20. t_scheduler = this;
  21. //新创建的主协程会参与到协程调度中
  22. m_mainCoroutine.reset(new Coroutine(std::bind(&Scheduler::run, this), 0, true));
  23. //线程的主协程不再是一开始使用协程初始化出来的那个母协程,而应更改为创建了调度器的协程
  24. t_sche_coroutine = m_mainCoroutine.get();
  25. m_mainThreadId = GetThreadId();
  26. m_threadIds.push_back(m_mainThreadId);
  27. }
  28. else //当前线程不作为调度线程使用
  29. {
  30. m_mainThreadId = -1;
  31. }
  32. //防止线程名称没改
  33. Thread::SetName(m_name);
  34. m_threadSum = threads_size;
  35. }

2.3.2 析构函数

  1. /**
  2. * @brief 调度器类析构函数
  3. */
  4. virtual ~Scheduler();
  5. Scheduler::~Scheduler()
  6. {
  7. //只有正在停止运行才能析构
  8. KIT_ASSERT(m_stopping);
  9. if(Scheduler::GetThis() == this)
  10. {
  11. t_scheduler = nullptr;
  12. }
  13. }

2.3.3 scheduler大概几个核心的接口

  • start() 开启调度器运作
  • stop() 停止调度器运作
  • run() 这个函数真正执行调度逻辑
  • schduleNoLock() 将任务加进队列

下面几个接口主要实现在派生类中,父类中不作核心实现:

  • tickle() 线程唤醒
  • stopping() 线程清理回收
  • idle() 协程没有任务可做时的处理,借助epoll_wait来唤醒有任务可执行

下文展开细说:

1). run()

两个部分在执行该函数:一部分是负责在线程里处理调度工作的主协程 ,一部分是线程池的其他子线程

功能:负责协程调度和线程管理,从任务队列拿出任务,执行对应的任务。

  • 核心逻辑:

while(1)
{

  1. 加锁,取出可执行对象容器/消息队列/任务队列m_couroutine中的元素,解锁

    1. 如果当前的可执行对象没有指定线程 且不是 当前在跑的线程要执行的,就跳过。并且设置一个信号量,如bool is_tickle以便通知其他线程来执行这个属于它们的可执行对象(任务)

    2. 如果当前的可执行对象当前在跑的线程要执行的,检查协程体和回调函数是否为空,为空断言报错;不为空取出,从队列删除该元素

    3. 如果没有一个可执行对象是当前跑的线程应该执行的,就设置一个标志bool is_work表明当前是否有任务可做,没有转去执行idle()函数

  2. 开始执行从队列中拿出的可执行对象,分为:协程和函数。(本质都是要调度协程,只是兼容传入的可执行对象是函数的情况)

    1. 如果为协程,并且当前线程应该工作is_work = true,分情况讨论:

      1. 协程处于没有执行完毕TERM没有异常EXCEPTION,就swapIn()调入(继续)执行。调回(不一定是执行完毕了,也有可能到时了)后,如果处于就绪状态READY需要再一次加入队列中;
      2. 否则,调回后仍处于没有执行完毕TERM没有异常EXCEPTION就要将其置为挂起HOLD状态。
    2. 如果为函数,整个流程和协程一模一样,只是需要使用一个指针创建一个协程使其能够被调度。

}

  1. void Scheduler::run()
  2. {
  3. KIT_LOG_DEBUG(g_logger) << "run start!";
  4. setThis();
  5. //当前线程ID不等于主线程ID
  6. if(GetThreadId() != m_mainThreadId)
  7. {
  8. t_sche_coroutine = Coroutine::GetThis().get();
  9. }
  10. //创建一个专门跑idel()的协程
  11. Coroutine::ptr idle_coroutine(new Coroutine(std::bind(&Scheduler::idle, this)));
  12. Coroutine:: ptr cb_coroutine;
  13. CoroutineObject co;
  14. while(1)
  15. {
  16. /* 一、从消息队列取出可执行对象 */
  17. //清空可执行对象
  18. co.reset();
  19. //是一个信号 没轮到当前线程执行任务 就要发出信号通知下一个线程去处理
  20. bool is_tickle = false;
  21. bool is_work = true;
  22. //加锁
  23. {
  24. //取出协程消息队列的元素
  25. MutexType::Lock lock(m_mutex);
  26. auto it = m_coroutines.begin();
  27. for(;it != m_coroutines.end();++it)
  28. {
  29. //a.当前任务没有指定线程执行 且 不是我当前线程要处理的协程 跳过
  30. if(it->threadId != -1 && it->threadId != GetThreadId())
  31. {
  32. is_tickle = true;
  33. continue;
  34. }
  35. KIT_ASSERT(it->cor || it->cb);
  36. //b.契合线程的协程且正在处理 跳过
  37. // if(it->cor && it->cor->getState() == Coroutine::State::EXEC)
  38. // {
  39. // continue;
  40. // }
  41. //b.是我当前线程要处理的任务/协程 就取出并且删除
  42. co = *it;
  43. m_coroutines.erase(it);
  44. ++m_activeThreadCount;
  45. break;
  46. }
  47. // KIT_LOG_DEBUG(g_logger) << "m_coroutine size=" << m_coroutines.size();
  48. if(it == m_coroutines.end())
  49. {
  50. is_work = false;
  51. }
  52. else
  53. {
  54. KIT_LOG_DEBUG(g_logger) << "拿到一个任务";
  55. }
  56. }//解锁
  57. if(is_tickle)
  58. {
  59. tickle();
  60. }
  61. /*二、根据可执行对象的类型 分为 协程和函数 来分别执行对应可执行操作*/
  62. //a. 如果要执行的任务是协程
  63. //契合当前线程的协程还没执行完毕
  64. if(co.cor && is_work && co.cor->getState() != Coroutine::State::TERM &&
  65. co.cor->getState() != Coroutine::State::EXCEPTION)
  66. {
  67. co.cor->swapIn();
  68. --m_activeThreadCount;
  69. //从上面语句调回之后的处理 分为 还需要继续执行 和 需要挂起
  70. if(co.cor->getState() == Coroutine::State::READY)
  71. {
  72. schedule(co.cor);
  73. }
  74. else if(co.cor->getState() != Coroutine::State::TERM &&
  75. co.cor->getState() != Coroutine::State::EXCEPTION)
  76. {
  77. //协程状态置为HOLD
  78. co.cor->setState(Coroutine::State::HOLD);
  79. }
  80. //可执行对象置空
  81. co.reset();
  82. }
  83. else if(co.cb && is_work) //b. 如果要执行的任务是函数
  84. {
  85. //KIT_LOG_DEBUG(g_logger) << "任务是函数";
  86. if(cb_coroutine) //协程体的指针不为空就继续利用现有空间
  87. {
  88. cb_coroutine->reset(co.cb);
  89. }
  90. else //为空就重新开辟
  91. {
  92. cb_coroutine.reset(new Coroutine(co.cb));
  93. }
  94. //可执行对象置空
  95. co.reset();
  96. cb_coroutine->swapIn();
  97. --m_activeThreadCount;
  98. //从上面语句调回之后的处理 分为 还需要继续执行 和 需要挂起
  99. if(cb_coroutine->getState() == Coroutine::State::READY)
  100. {
  101. schedule(cb_coroutine);
  102. //智能指针置空
  103. cb_coroutine.reset();
  104. }
  105. else if(cb_coroutine->getState() == Coroutine::State::TERM ||
  106. cb_coroutine->getState() == Coroutine::State::EXCEPTION)
  107. {
  108. //把执行任务置为空
  109. cb_coroutine->reset(nullptr);
  110. }
  111. else
  112. {
  113. //状态置为 HOLD
  114. cb_coroutine->setState(Coroutine::State::HOLD);
  115. //智能指针置空
  116. cb_coroutine.reset();
  117. }
  118. }
  119. else //c.没有任务需要执行 去执行idle() --->代表空转
  120. {
  121. //负责idle()的协程结束了 说明当前线程也结束了直接break
  122. if(idle_coroutine->getState() == Coroutine::State::TERM)
  123. {
  124. KIT_LOG_INFO(g_logger) << "idle_coroutine TERM!";
  125. break;
  126. }
  127. ++m_idleThreadCount;
  128. idle_coroutine->swapIn();
  129. --m_idleThreadCount;
  130. if(idle_coroutine->getState() != Coroutine::State::TERM &&
  131. idle_coroutine->getState() != Coroutine::State::EXCEPTION)
  132. {
  133. //状态置为 HOLD
  134. idle_coroutine->setState(Coroutine::State::HOLD);
  135. }
  136. }
  137. }
  138. }

2). start()

功能:开启Schuduler调度器的运行。根据传入的线程数,初始化其余子线程,将调度协程推送到CPU

  1. //开启调度器
  2. void Scheduler::start()
  3. {
  4. MutexType::Lock lock(m_mutex);
  5. //一开始 m_stopping = true
  6. if(!m_stopping)
  7. {
  8. KIT_LOG_WARN(g_logger) << "Scheduler: scheduler is stopping!";
  9. return;
  10. }
  11. m_stopping = false;
  12. KIT_ASSERT(m_threads.empty());
  13. m_threads.resize(m_threadSum);
  14. for(size_t i = 0;i < m_threads.size();++i)
  15. {
  16. m_threads[i].reset(new Thread(std::bind(&Scheduler::run, this), m_name + "_" + std::to_string(i)));
  17. m_threadIds.push_back(m_threads[i]->getId());
  18. }
  19. lock.unlock();
  20. if(m_mainCoroutine)
  21. m_mainCoroutine->call();
  22. }

3). stop()

功能:停止Scheduler调度器运行。分情况讨论:

  1. 只有一个线程(即:主线程/调度协程在运行),并且调度协程处于终止态创建态。直接调stopping()负责清理、回收工作,退出返回。

  2. 有多个线程(是一组线程池),先设置标志位m_stopping,唤醒tickle()其他子线程根据该标志位退出;都完毕之后,再将调度协程唤醒退出。

  1. //调度器停止
  2. void Scheduler::stop()
  3. {
  4. /*
  5. * 用了use_caller的线程 必须在这个线程里去执行stop
  6. * 没有用use_caller的线程 可以任意在别的线程执行stop
  7. */
  8. m_autoStop = true;
  9. //1.只有一个主线程在运行的情况 直接停止即可
  10. if(m_mainCoroutine && m_threadSum == 0 &&
  11. (m_mainCoroutine->getState() == Coroutine::State::TERM ||
  12. m_mainCoroutine->getState() == Coroutine::State::INIT))
  13. {
  14. KIT_LOG_INFO(g_logger) << this << ",scheduler name=" << m_name << " stopped";
  15. m_stopping = true;
  16. if(stopping())
  17. return;
  18. }
  19. //2.多个线程在运行 先把子线程停止 再停止主线程
  20. //主线程Id不为-1说明是创建调度器的线程
  21. if(m_mainThreadId != -1)
  22. {
  23. //当前的执行器要把创建它的线程也使用的时候 它的stop一定要在创建线程中执行
  24. KIT_ASSERT(GetThis() == this);
  25. }
  26. else
  27. {
  28. KIT_ASSERT(GetThis() != this);
  29. }
  30. //其他线程根据这个标志位退出运行
  31. m_stopping = true;
  32. //唤醒其他线程结束
  33. for(size_t i = 0;i < m_threadSum;++i)
  34. {
  35. tickle();
  36. }
  37. //最后让主线程也退出
  38. if(m_mainCoroutine)
  39. {
  40. tickle();
  41. }
  42. if(stopping())
  43. return;
  44. }

4). scheduleNoLock()

功能:将任务加入到任务队列中。接收参数形式:协程Coroutine和函数对象function<>。多封装了一层用于处理单个加入队列和批量加入队列的情况。

  • 核心逻辑:

协程开发  (一) - 图11

  1. //添加任务函数 单个放入队列
  2. template<class CorOrCB>
  3. void schedule(CorOrCB cc, int threadId = -1)
  4. {
  5. bool isEmpty = false;
  6. {
  7. MutexType::Lock lock(m_mutex);
  8. isEmpty = scheduleNoLock(cc, threadId);
  9. }
  10. if(isEmpty)
  11. tickle();
  12. }
  13. //添加任务函数 批量放入队列
  14. template<class InputIterator>
  15. void schedule(InputIterator begin, InputIterator end)
  16. {
  17. bool isEmpty = false;
  18. {
  19. MutexType::Lock lock(m_mutex);
  20. while(begin != end)
  21. {
  22. //只要有一次为真 就认为之前经历了空队列 必然有休眠 就必然要唤醒
  23. isEmpty = scheduleNoLock((&(*begin))) || isEmpty;
  24. ++begin;
  25. }
  26. }
  27. if(isEmpty)
  28. tickle();
  29. }
  30. //添加任务函数 真正执行添加动作
  31. template<class CorOrCB>
  32. bool scheduleNoLock(CorOrCB cc, int threadId = -1)
  33. {
  34. //如果为 true 则说明之前没有任何任务需要执行 现在放入一个任务
  35. bool isEmpty = m_coroutines.empty();
  36. CoroutineObject co(cc, threadId);
  37. if(co.cor || co.cb)
  38. {
  39. m_coroutines.push_back(co);
  40. }
  41. return isEmpty;
  42. }

C++知识点补充复习:std::bind()函数适配器

出处《C++ Primer》 P354

作用:接收一个可调用对象将其转换为一个我们需要的合适的可调用对象,不破坏原有对象的参数列表。
通俗的理解:可以事先将原有的可调用对象的参数进行一些指定(绑定),以符合当前需求场景。

本质上:bind生成的可调用对象会去调用所接受的对象,并且自动根据之前绑定好的参数自动传参

  • 基本用法: ```cpp

    include

    using namespace std;

class A { public: void test(int a, int b) { cout << a + b << endl; }

  1. static test2(int a, int b)
  2. {
  3. cout << a + b << endl;
  4. }
  5. void func()
  6. {
  7. //普通成员函数需要在第一个参数位置传入this指针
  8. auto f1 = std::bind(&A::test, this, 1, 2);
  9. f1();
  10. //静态成员函数不需要传this指针
  11. auto f2 = std::bind(&A::test2, 1, 2);
  12. f2();
  13. }

};

void test(int a, int b) { cout << a + b << endl; }

int main() {

  1. auto f1 = std::bind(&test, 1, 2);
  2. auto f2 = std::bind(&test, std::placeholders::_1, 2);
  3. auto f3 = std::bind(&test, std::placeholders::_1, std::placeholders::_2);
  4. f1();
  5. f2(2);
  6. f3(1, 2);
  7. return 0;

}

  1. - **最先用于解决算法模板**`**algorithm**`**中的一些问题:以**`**find_if**`**算法为例子**
  2. 假设我们需要找出一堆字符串中大于某一个长度sz的字符串,使用`find_if`算法模板如下:
  3. ```cpp
  4. #include <functional>
  5. #include <vector>
  6. #include <string>
  7. #include <algorithm>
  8. int main()
  9. {
  10. vector<string> mv = {"ASD","sdassdfsdfd","w21eeff1","sadawqq1"};
  11. int sz = 4;
  12. auto it = find_if(mv.begin(), mv.end(), [sz](string& a){
  13. return a.size() > sz;
  14. });
  15. cout << "第一个大于sz=" << sz << "的字符串是:" << *it << endl;
  16. return 0;
  17. }

image.png

有了函数适配器则不需要lambda表达式去作捕获:

  1. #include <iostream>
  2. #include <functional>
  3. #include <vector>
  4. #include <string>
  5. #include <algorithm>
  6. bool func(string &a, int sz)
  7. {
  8. return a.size() > sz;
  9. }
  10. int main()
  11. {
  12. vector<string> mv = {"ASD","sdassdfsdfd","w21eeff1","sadawqq1"};
  13. int sz = 4;
  14. auto it = find_if(mv.begin(), mv.end(), std::bind(&func, std::placeholders::_1, 4));
  15. cout << "第一个大于sz=" << sz << "的字符串是:" << *it << endl;
  16. return 0;
  17. }

image.png

  • 值得注意的一个问题:当类内出现函数重载,又要结合bind函数的情况:

需要显式的将类成员函数指针指示出来,来区别不同的重载函数

  1. //错误写法:
  2. class A
  3. {
  4. public:
  5. void test(int a, int b)
  6. {
  7. cout << a + b << endl;
  8. }
  9. static void test(int a, int b, int c)
  10. {
  11. cout << a + b << endl;
  12. }
  13. void func()
  14. {
  15. //由于函数同名,不知道需要绑定哪一个
  16. auto f1 = std::bind(&A::test, this, 1, 2);
  17. auto f2 = std::bind(&A::test, 1, 2, 3);
  18. f1();
  19. f2();
  20. }
  21. };
  1. //正确写法
  2. class A
  3. {
  4. public:
  5. void test(int a, int b)
  6. {
  7. cout << a + b << endl;
  8. }
  9. static void test(int a, int b, int c)
  10. {
  11. cout << a + b << endl;
  12. }
  13. void func()
  14. {
  15. auto f1 = std::bind((void (A::*)(int,int))&A::test, this, 1, 2);
  16. auto f2 = std::bind((void (*)(int, int, int))&A::test, 1, 2, 3);
  17. f1();
  18. f2();
  19. }
  20. };