1. 协程类封装测试

1.1 协程基本切换功能

目的:检查协程的基本切换逻辑是否正常。

1.1.1 调用如下:

  1. void func_in()
  2. {
  3. KIT_LOG_INFO(KIT_LOG_ROOT()) << "func1 begin";
  4. Coroutine::YieldToHold();
  5. KIT_LOG_INFO(KIT_LOG_ROOT()) << "func1 end";
  6. Coroutine::YieldToHold();
  7. }
  8. int main()
  9. {
  10. //这个语句的存在为了初始化母协程 有点瑕疵
  11. Coroutine::GetThis();
  12. KIT_LOG_INFO(KIT_LOG_ROOT()) << "coroutine test begin";
  13. Coroutine::ptr cor1(new Coroutine(func_in));
  14. cor1->swapIn();
  15. KIT_LOG_INFO(KIT_LOG_ROOT()) << "1 swapIn after";
  16. cor1->swapIn();
  17. KIT_LOG_INFO(KIT_LOG_ROOT()) << "2 swapIn after";
  18. KIT_LOG_INFO(KIT_LOG_ROOT()) << "coroutine test end";
  19. return 0;
  20. }

1.1.2 时序逻辑如下:

  • 执行逻辑: 协程开发测试 - 图2
  • 代码逻辑: 协程开发测试 - 图3

    1.1.3 运行结果如下:

    image.png

  • GDB断点观察:可以看出是按照预想的顺序执行代码的

image.png
image.png
image.png

1.1.4 BUG:有两个协程对象,但是只有一个协程被析构,内存泄漏了

  • 原因:在协程执行的回调函数里,最后手动的让子协程切换回主协程中,使其没有退出相应的作用域,触发不了智能指针引用计数减少,导致最后无法自动释放空间。

    1.1.4.1 现象:

    image.png
    image.png
    image.png

    1.1.4.2 解决办法:从智能指针对象拿出裸指针来使用

    image.png

  • 追问:万一此时恰好智能指针管理的对象引用次数为0,不就释放空间了吗?操作了一个空指针nullptr必然段错误呀!

其实不然,cur所指向的空间完全不可能在p->swapOut()前就释放,因为在静态函数MainFunc()中,获取的是当前正在运行的Coroutine对象this指针的智能指针,这里get()取出裸指针正是要减少一次对当前运行协程的”引用计数”,否则退出了该函数,智能指针cur将永远留下来,造成内存泄漏。

  • 区别:空指针能调用成员函数的情况。如下一个空的类对象指针依然能调成员函数: ```cpp class A { public:

    void func() {

    1. cout << 6666 << endl;

    }

private: int a; };

int main() { A *a = nullptr; a->func();

  1. return 0;

}

  1. ![image.png](https://cdn.nlark.com/yuque/0/2021/png/25460685/1639573136824-f9e4a2b7-6fb1-431c-b1c3-c670e8992e11.png#clientId=u87a8ca87-be2d-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=35&id=u45c5793f&margin=%5Bobject%20Object%5D&name=image.png&originHeight=35&originWidth=520&originalType=binary&ratio=1&rotation=0&showTitle=false&size=3845&status=done&style=none&taskId=uebad0247-61cb-47a6-9fa3-e66d68a2342&title=&width=520)
  2. <a name="gBdIN"></a>
  3. ## 1.2 多线程下协程的切换
  4. <a name="RiRjO"></a>
  5. ### 1.2.1 调用如下:
  6. ```cpp
  7. void test_coroutine()
  8. {
  9. KIT_LOG_INFO(KIT_LOG_ROOT()) << "sub_thread test begin";
  10. //初始化母协程
  11. Coroutine::GetThis();
  12. //创建一个子协程
  13. Coroutine::ptr cor1(new Coroutine(func_in));
  14. cor1->swapIn();
  15. KIT_LOG_INFO(KIT_LOG_ROOT()) << "1 swapIn after";
  16. cor1->swapIn();
  17. KIT_LOG_INFO(KIT_LOG_ROOT()) << "2 swapIn after";
  18. KIT_LOG_INFO(KIT_LOG_ROOT()) << "sub_thread test end";
  19. }
  20. int main()
  21. {
  22. Thread::_setName("main thread");
  23. vector<Thread::ptr> mv;
  24. //启动3个线程 每个线程上有2个协程:1个母协程 1个子协程
  25. for(int i = 0; i < 3;i++)
  26. {
  27. Thread::ptr p(new Thread(&test_coroutine, "t_" + to_string(i)));
  28. mv.emplace_back(p);
  29. }
  30. for(auto &x : mv)
  31. x->join();
  32. return 0;
  33. }

1.2.2 运行结果如下:

image.png

2. 协程调度测试

2.1 调度器的创建、开启、停止

2.1.1 调用如下:

  1. int main()
  2. {
  3. //将调度器命名为"test"
  4. Scheduler sc("test");
  5. //开启调度器
  6. sc.start();
  7. //结束调度器
  8. sc.stop();
  9. return 0;
  10. }

2.1.2 BUG运行结果如下

可以看到触发了一个异常:bad_function_call这个异常是因为函数包装器function<>的内容是空nullptr而我们发现它被企图调用。
说明此时的问题:主线程中的0号协程类对象中的function<>没有被赋值内容,0号协程不应该会去执行MainFunc(),进一步说明调度逻辑有问题。
image.png

2.1.3 BUG一番修改后,再一次运行结果如下:

发现代码跳转陷入到一个循坏中,发生了隐性的一个代码逻辑成环的一个情况。
image.png

  • 以上结果的代码的逻辑如下:

协程开发测试 - 图15

2.1.4 导致死循环的问题代码块:

  • **swapin()**函数:(init母协程)init_cor_sp->m_ctx————-切换————->(当前协程)this->m_ctx ```cpp void Coroutine::swapIn() { SetThis(this); //没在运行态才能 调入运行 KIT_ASSERT(m_state != State::EXEC);

    m_state = State::EXEC; if(swapcontext(&init_cor_sp->m_ctx, &m_ctx) < 0) {

    1. KIT_LOG_ERROR(g_logger) << "swapIn: swapcontext error";
    2. KIT_ASSERT2(false, "swapcontext error");

    }

}

  1. - `**swapout()**`**函数分两种情况:**
  2. - 当前协程**不是**调度器协程:
  3. `(当前协程)this->m_ctx`---------切换--------->`(调度器协程)Scheduler::GetMainCor()->m_ctx`
  4. - 当前协程**是**调度器协程:
  5. `(当前协程)this->m_ctx`---------切换--------->`(init母协程)init_cor_sp->m_ctx`
  6. ```cpp
  7. void Coroutine::swapOut()
  8. {
  9. if(this != Scheduler::GetMainCor())
  10. {
  11. SetThis(Scheduler::GetMainCor());
  12. if(swapcontext(&m_ctx, &Scheduler::GetMainCor()->m_ctx) < 0)
  13. {
  14. KIT_LOG_ERROR(g_logger) << "swapOut: swapcontext error";
  15. KIT_ASSERT2(false, "swapcontext error");
  16. }
  17. }
  18. else
  19. {
  20. SetThis(init_cor_sp.get());
  21. if(swapcontext(&m_ctx, &init_cor_sp->m_ctx) < 0)
  22. {
  23. KIT_LOG_ERROR(g_logger) << "swapOut: swapcontext error";
  24. KIT_ASSERT2(false, "swapcontext error");
  25. }
  26. }
  27. }

成因:swapin()swapout()切换代码序列时存储切出位置没有对应正确,从run()切换swapIn()要保存的m_ctx本应该在Scheduler对象中但跑到了init_cor_sp中;从MainFunc()----idle()切换swapOut()选择了从Scheduler对象中切回恢复,但此时里面保存的根本不是上一次正确的应该切回的代码序列。

解决要点:让切出、切回的代码序列对应起来,不要把不同代码序列的切换混在一起即可。

2.1.5 将swapIn()函数修改后

  1. //从调度器 切换到 到目标代码序列
  2. void Coroutine::swapIn()
  3. {
  4. SetThis(this);
  5. //没在运行态才能 调入运行
  6. KIT_ASSERT(m_state == State::INIT || m_state == State::HOLD);
  7. m_state = State::EXEC;
  8. // if(swapcontext(&init_cor_sp->m_ctx, &m_ctx) < 0)
  9. // {
  10. // KIT_LOG_ERROR(g_logger) << "swapIn: swapcontext error";
  11. // KIT_ASSERT2(false, "swapcontext error");
  12. // }
  13. if(swapcontext(&Scheduler::GetMainCor()->m_ctx, &m_ctx) < 0)
  14. {
  15. KIT_LOG_ERROR(g_logger) << "swapIn: swapcontext error";
  16. KIT_ASSERT2(false, "swapcontext error");
  17. }
  18. }

运行结果如下:已经基本正确

image.png

2.2 调度器单线程任务添加

2.2.1 调用如下:

  1. static Logger::ptr g_logger = KIT_LOG_ROOT();
  2. void func1()
  3. {
  4. KIT_LOG_DEBUG(g_logger) << "func1 work!!!!!!!!!";
  5. }
  6. void func2()
  7. {
  8. KIT_LOG_DEBUG(g_logger) << "func2 work!!!!!!!!!";
  9. }
  10. int main()
  11. {
  12. //只有调度器这一个主线程在工作
  13. Scheduler sc("test");
  14. sc.schedule(&func1); //以函数形式入队
  15. Coroutine::ptr c1(new Coroutine(&func2)); //创建协程
  16. sc.schedule(c1); //以协程形式入队
  17. //开启调度器
  18. sc.start();
  19. //停止调度器
  20. sc.stop();
  21. KIT_LOG_INFO(g_logger) << "test over";
  22. return 0;
  23. }

运行结果如下:

image.png

2.2.2 小BUG:在start()之后添加任务,无法调度

原因:添加了调试debug信息后,发现start()后,直接从init协程跳转到了调度协程的run()中,当run()函数都执行完毕(此时调度完毕的标志是idle()返回),这时任务还没有来得及添加进队列,调度过程却已经结束了,等任务加入到队列也没有意义了。

  1. static Logger::ptr g_logger = KIT_LOG_ROOT();
  2. void func1()
  3. {
  4. KIT_LOG_DEBUG(g_logger) << "func1 work!!!!!!!!!";
  5. }
  6. void func2()
  7. {
  8. KIT_LOG_DEBUG(g_logger) << "func2 work!!!!!!!!!";
  9. }
  10. int main()
  11. {
  12. Scheduler sc("test");
  13. sc.start();
  14. /*在调度器开启后再添加任务*/
  15. sc.schedule(&func1); //以函数形式入队
  16. Coroutine::ptr c1(new Coroutine(&func2)); //创建协程
  17. sc.schedule(c1); //以协程形式入队
  18. sc.stop();
  19. KIT_LOG_INFO(g_logger) << "test over";
  20. return 0;
  21. }

image.png

2.2.3 改进实现功能:不管任务添加时机在start前还是后都要能够调度到任务。

  • 方案1:将run()开始执行的时间延后到stop()里面,stop()之前添加的任务都能调度

  • 方案2:是一个典型的生产者-消费者的模型。可以考虑使用信号量+互斥锁

2.3 调度器多线程任务添加

2.3.1 调用如下:

  1. static Logger::ptr g_logger = KIT_LOG_ROOT();
  2. void func1()
  3. {
  4. KIT_LOG_DEBUG(g_logger) << "func1 work!!!!!!!!!";
  5. }
  6. void func2()
  7. {
  8. KIT_LOG_DEBUG(g_logger) << "func2 work!!!!!!!!!";
  9. }
  10. void func3()
  11. {
  12. KIT_LOG_DEBUG(g_logger) << "func3 work!!!!!!!!!";
  13. }
  14. int main()
  15. {
  16. //创建额外的两个线程, 算上调度器的线程 共3个
  17. Scheduler sc("test", 3);
  18. /*调度器开启前加任务*/
  19. sc.schedule(&func3);
  20. sc.start();
  21. /*调度器开启后加任务*/
  22. sc.schedule(&func1);
  23. Coroutine::ptr c1(new Coroutine(&func2));
  24. sc.schedule(c1);
  25. sc.stop();
  26. KIT_LOG_INFO(g_logger) << "test over";
  27. return 0;
  28. }

2.3.2 运行结果如下:

可以看到3个任务,在不同的协程中运行。
image.png

2.3.3 改动:在stop()中对子线程进行回收

  1. 这里这么写的作用:
  1. 保证工作队列的线程安全
  2. 快速将工作队列清空 将线程资源拿到这来进行清理,等待子线程执行完毕

    1. void Scheduler::stop()
    2. {
    3. ...
    4. std::vector<Thread::ptr> threads;
    5. {
    6. MutexType::Lock lock(m_mutex);
    7. threads.swap(m_threads);
    8. }
    9. for(auto &x : threads)
    10. {
    11. x->join();
    12. }
    13. ...
    14. }

    2.4 调度器多线程将任务延迟添加

    循环的将任务延迟添加到任务队列中。

  • 不让**idle()**函数简单的结束,否则线程空转时间太短,观察不到切换:

只要调度器不停止,idle()空转函数就不停止,并且从idle_coroutine协程切回的时候,让协程的状态置为HOLD挂起状态,并非TERM终止状态。

  1. //协程空转函数
  2. bool Scheduler::idle()
  3. {
  4. while(!stopping())
  5. {
  6. KIT_LOG_INFO(g_logger) << "coroutine idle";
  7. //当前协程中途让出执行时间
  8. Coroutine::YieldToHold();
  9. }
  10. /*注意:退出循坏意味着 MainFunc/CallMainFunc 已经结束
  11. * 当前协程会被置为TERM状态
  12. */
  13. return 0;
  14. }

2.4.1 调用如下

  1. static Logger::ptr g_logger = KIT_LOG_ROOT();
  2. void func1()
  3. {
  4. KIT_LOG_DEBUG(g_logger) << "func1 work!!!!!!!!!";
  5. //抢到该任务的线程小睡一下
  6. usleep(100);
  7. //睡眠结束 往任务队列添加5个任务 共执行6次func1()任务
  8. static int i = 5;
  9. while(--i >= 0)
  10. Scheduler::GetThis()->schedule(&func1);
  11. }
  12. int main()
  13. {
  14. //额外开2个线程, 算上调度器主线程 共3个线程
  15. Scheduler sc("test", 3);
  16. //调度器开启
  17. sc.start();
  18. //添加任务
  19. sc.schedule(&func1);
  20. //调度器停止
  21. sc.stop();
  22. KIT_LOG_INFO(g_logger) << "test over";
  23. return 0;
  24. }

2.4.2 运行结果如下:

从结果来看,子线程256192(test_0)没有抢到任务,在执行”空转”idle(),执行中途切出Coroutine::YieldToHold()。切换顺序:test_0:0号协程——->test_0:2号协程——>test_0:0号协程
但是回到0号协程时候发生段错误。
image.png

2.4.3 BUG操作了一个空指针nullptr

使用GDB调试工具追踪到,Coroutine::setState()函数附近有操作空指针。由于此时在idle()函数中使用了Coroutine::YieldToHold()已经把协程状态置为了HOLD,会进入条件判断中,操作了一个空结构体CoroutineObject coco.cor->setState()
image.png
image.png

2.4.4 修正后运行结果如下:

基本实现了多线程多协程间的切换。
image.png
image.png

2.4.5 时序逻辑如下:

主线程259594(test)

调度器构造Scheduler():
1. 创建init协程:0号协程
1. 创建调度协程:1号协程


调度器.start():
初始化子线程
子线程259595(test_0) 子线程259596(test_1)

①开始执行run()函数

构造空转协程:2号协程
添加一个任务schedule()

执行调度器.stop():
开启调度器的调度call()


转到1号协程:
①执行run()函数


构造空转协程:3号协程

抢到一个任务:函数形式func
(第一个任务)


为函数构造一个协程以供调度:4号协程

swapIn():跳转任务中去执行

usleep(100)小睡一下


没有抢到任务
(唯一的任务被主线程拿走)


swapIn():跳转到idle()

YieldToHold():执行中切出
2号协程被挂起HOLD


②切回执行run()函数

………..

没有抢到任务
(唯一的任务被主线程拿走)


swapIn():跳转到idle()

YieldToHold():执行中切出
2号协程被挂起HOLD


③切回执行run()函数

………..

没有抢到任务
(唯一的任务被主线程拿走)


swapIn():跳转到idle()

YieldToHold():执行中切出
2号协程被挂起HOLD


④切回执行run()函数

………

没有抢到任务
(唯一的任务被主线程拿走)

睡眠结束
向任务队列添加5个任务
之前任务队列是空的,会触发一次唤醒tickle()


抢到一个任务:函数形式func1
(第二个任务)


复用4号协程以供调度

swapIn():跳转到任务去执行

任务执行结束,swapOut()
②切回执行run()函数



swapIn():跳转到idle()

YieldToHold():执行中切出
2号协程被挂起HOLD


⑤切回执行run()函数

抢到一个任务:函数形式func1
(第三个任务)


为函数构造一个协程以供调度:5号协程

swapIn():跳转到任务去执行

任务执行结束,swapOut()
⑥切回执行run()函数



①开始执行run()函数
抢到一个任务:函数形式func1
(第四个任务)




构造空转协程:6号协程
复用4号协程以供调度

swapIn():跳转到任务去执行

任务执行结束,swapOut()
②切回执行run()函数




抢到一个任务:函数形式func1
(第五个任务)


为函数构造一个协程以供调度:7号协程


swapIn():跳转到任务去执行


任务执行结束,swapOut()
②切回执行run()函数

抢到一个任务:函数形式func1
(第六个任务)


复用5号协程以供调度

swapIn():跳转到任务去执行

任务执行结束,swapOut()
⑥切回执行run()函数



……..


idle_coroutine TERM!
退出调度循环,线程退出
7号协程析构
6号协程析构


……..
……..

idle_coroutine TERM!
退出调度循环
4号协程析构
3号协程析构


返回到stop()函数中
回收子线程资源join()



……..

idle_coroutine TERM!
退出调度循环,线程退出
5号协程析构
2号协程析构

资源回收结束返回main()函数

程序结束

1号协程析构
0号协程析构


2.5 调度器多线程给任务指定线程执行

2.5.1 调用如下:

func1()中添加任务时指定当前运行线程为下一次的执行线程

  1. static Logger::ptr g_logger = KIT_LOG_ROOT();
  2. void func1()
  3. {
  4. KIT_LOG_DEBUG(g_logger) << "func1 work!!!!!!!!!";
  5. sleep(1);
  6. static int i = 5;
  7. if(--i >= 0)
  8. Scheduler::GetThis()->schedule(&func1, GetThreadId());
  9. }
  10. int main()
  11. {
  12. Scheduler sc("test", 3, false);
  13. sc.start();
  14. sc.schedule(&func1);
  15. sc.stop();
  16. KIT_LOG_INFO(g_logger) << "test over";
  17. return 0;
  18. }

2.5.2 运行结果如下:

每一个任务都指定了运行的线程,这样就无法发生线程的切换。每一个任务都是由262332(test_1)线程去执行完成的。
image.png

3. IO调度测试

3.1 协程调度

目的:看继承之后的子类IOManager是否能正常调度,保证原有Scheduler的功能不被破坏

3.1.1 调用如下:

  1. static Logger::ptr g_logger = KIT_LOG_ROOT();
  2. void func1()
  3. {
  4. KIT_LOG_INFO(g_logger) << "func1 work!!!!!!!!!!";
  5. }
  6. void func2()
  7. {
  8. KIT_LOG_INFO(g_logger) << "func2 work!!!!!!!!!!";
  9. }
  10. int main()
  11. {
  12. Thread::_setName("test");
  13. //创建IO调度器
  14. KIT_LOG_INFO(g_logger) << "test begin";
  15. IOManager iom;
  16. //添加函数任务
  17. iom.schedule(&func1);
  18. //添加协程任务
  19. Coroutine::ptr cor(new Coroutine(&func2));
  20. iom.schedule(cor);
  21. KIT_LOG_INFO(g_logger) << "test end";
  22. return 0;
  23. }

3.1.2 运行结果:

从结果来看,调度器的开启start()、停止stop()、添加/调度任务功能都是正常的。
image.png

3.2 单线程IO调度

目的:让IO调度器操作句柄以及能够触发句柄对应事件上的回调函数

方法:创建一个socket套接字去连接一个TCP服务器(工具伪装的)

  • 技巧点:由于要涉及网络IO,使用了一个网络传输的调试小工具,可以伪装TCP服务器/TCP客户端/UDP终端,方便调试IO的读写事件触发

image.png

3.2.1 添加句柄调用如下:

  1. 把这个IO加入到IOManager中去。
  2. 为这个socket套接字绑定两个回调函数,分别在这个IO上出现EPOLLIN读事件和EPOLLOUT写事件时自动触发执行。 ```cpp

    define PORT 8080 //服务器端口号

//任务 void func1() { KIT_LOG_INFO(g_logger) << “func1 work!!!!!!!!!!”; }

int main() { Thread::_setName(“test”); KIT_LOG_INFO(g_logger) << “test begin”; IOManager iom;

  1. iom.schedule(&func1);
  2. /*设置一个socket IO 去测试异步触发*/
  3. int sock_fd = socket(AF_INET, SOCK_STREAM, 0);
  4. if(sock_fd < 0)
  5. {
  6. std::cout << "socket create error:" << strerror(errno);
  7. return 0;
  8. }
  9. //固定的socket编程
  10. struct sockaddr_in sockaddr;
  11. bzero(&sockaddr, sizeof(struct sockaddr_in));
  12. sockaddr.sin_family = AF_INET;
  13. sockaddr.sin_port = htons(PORT);
  14. sockaddr.sin_addr.s_addr = inet_addr("192.168.77.1"); //连接模拟TCP服务器
  15. //为sock_fd 添加读事件回调
  16. iom.addEvent(sock_fd, IOManager::Event::READ, [](){
  17. KIT_LOG_INFO(g_logger) << "sock io read event!!";
  18. });
  19. //为sock_fd 添加写事件回调
  20. iom.addEvent(sock_fd, IOManager::Event::WRITE, [](){
  21. KIT_LOG_INFO(g_logger) << "sock io wire event!!";
  22. });
  23. //连接服务器
  24. if(connect(sock_fd, (struct sockaddr*)&sockaddr, sizeof(struct sockaddr)) < 0)
  25. {
  26. std::cout << "connect error:" << strerror(errno) << std::endl;
  27. }
  28. //把socket 置为非阻塞
  29. fcntl(sock_fd, F_SETFL, O_NONBLOCK);
  30. //发送一段数据 触发写事件
  31. send(sock_fd, "hello", 5, 0);
  32. KIT_LOG_INFO(g_logger) << "test end";
  33. return 0;

}

  1. <a name="beebd"></a>
  2. #### BUG:运行后没有发现触发了写事件
  3. ![image.png](https://cdn.nlark.com/yuque/0/2021/png/25460685/1640519341539-d0fafc21-ea01-44bf-a5e7-b99e90d22d90.png#clientId=ua70afa40-b080-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=579&id=uc306e719&margin=%5Bobject%20Object%5D&name=image.png&originHeight=579&originWidth=1071&originalType=binary&ratio=1&rotation=0&showTitle=false&size=142816&status=done&style=none&taskId=u063a2d7e-5eba-4846-8a31-6589826d9f5&title=&width=1071)
  4. - 读事件被触发,经过`send`之后,TCP服务器也收到了发送的数据,TCP连接没有问题,数据收发正常:使用 `netstat -ntlap | grep test_io`查看当前进程的IO连接情况:成功建立。
  5. ![image.png](https://cdn.nlark.com/yuque/0/2021/png/25460685/1640519612966-25aad4dd-4264-4c1a-9560-9e81972d79e8.png#clientId=ua70afa40-b080-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=94&id=u50296129&margin=%5Bobject%20Object%5D&name=image.png&originHeight=94&originWidth=795&originalType=binary&ratio=1&rotation=0&showTitle=false&size=17728&status=done&style=none&taskId=u9c8b6a5f-0353-4e08-afb1-c3cf2edf8a6&title=&width=795)
  6. - 现象:写事件绑定的回调没有被执行。并且线程在`idle()`函数中陷入`epoll_wait()`函数死循环。由于添加了写事件回调,而该回调没有被正常消耗执行掉,导致`m_pendingEventCount`待处理事件数量没有为0,`IOManager::stopping()`终止函数无法终止,调度器无法关闭。
  7. ![image.png](https://cdn.nlark.com/yuque/0/2021/png/25460685/1640520637605-fc17754a-f42a-49d4-98f1-d2e99e3fa55e.png#clientId=ua70afa40-b080-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=249&id=u51dc8dd6&margin=%5Bobject%20Object%5D&name=image.png&originHeight=249&originWidth=908&originalType=binary&ratio=1&rotation=0&showTitle=false&size=54285&status=done&style=none&taskId=uc9b41899-9298-42e5-83b5-726d8c18a04&title=&width=908)
  8. - 原因:将自定义的事件`Event`和现有`epoll_event`混为使用,导致了放入到`epoll`中的事件不能正确触发:
  9. ![image.png](https://cdn.nlark.com/yuque/0/2021/png/25460685/1640520829462-5b1ae12f-7d2b-4b04-8837-e6ac67a2e7ca.png#clientId=ua70afa40-b080-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=488&id=u22f1a92a&margin=%5Bobject%20Object%5D&name=image.png&originHeight=488&originWidth=993&originalType=binary&ratio=1&rotation=0&showTitle=false&size=69168&status=done&style=none&taskId=u2d6c7614-beac-4f44-80f6-29b1c44ccdd&title=&width=993)
  10. - 要么将这二者统一,要么将二者完全分离。选择前者,将其统一。其他使用到的地方,也照做
  11. ![image.png](https://cdn.nlark.com/yuque/0/2021/png/25460685/1640520861141-6aea86da-a757-406f-9bd3-87d3fd5ac210.png#clientId=ua70afa40-b080-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=457&id=u7b31048a&margin=%5Bobject%20Object%5D&name=image.png&originHeight=457&originWidth=958&originalType=binary&ratio=1&rotation=0&showTitle=false&size=52890&status=done&style=none&taskId=uee141b80-ce9e-4589-9d9c-88dbca569d4&title=&width=958)
  12. <a name="roulw"></a>
  13. #### 修正后运行结果如下:
  14. ![image.png](https://cdn.nlark.com/yuque/0/2021/png/25460685/1640521087106-61c503b4-fe5c-4ec9-8aa9-771a012950b0.png#clientId=ua70afa40-b080-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=616&id=uf1cc3177&margin=%5Bobject%20Object%5D&name=image.png&originHeight=616&originWidth=1088&originalType=binary&ratio=1&rotation=0&showTitle=false&size=172475&status=done&style=none&taskId=u2da8998f-c511-468a-8595-e8eb97af3ae&title=&width=1088)
  15. <a name="Ns5ag"></a>
  16. ### 3.2.2 删除句柄调用如下:
  17. 在触发写事件之前,将之前设置在句柄上的读事件删除,使用TCP服务器发送消息触发IO读事件,但打印中不会出现读事件对应的回调函数。
  18. ```cpp
  19. static Logger::ptr g_logger = KIT_LOG_ROOT();
  20. #define PORT 8080
  21. void func1()
  22. {
  23. KIT_LOG_INFO(g_logger) << "func1 work!!!!!!!!!!";
  24. }
  25. int main()
  26. {
  27. Thread::_setName("test");
  28. KIT_LOG_INFO(g_logger) << "test begin";
  29. IOManager iom;
  30. iom.schedule(&func1);
  31. /*设置一个socket IO 去测试异步触发*/
  32. int sock_fd = socket(AF_INET, SOCK_STREAM, 0);
  33. if(sock_fd < 0)
  34. {
  35. std::cout << "socket create error:" << strerror(errno);
  36. return 0;
  37. }
  38. struct sockaddr_in sockaddr;
  39. bzero(&sockaddr, sizeof(struct sockaddr_in));
  40. sockaddr.sin_family = AF_INET;
  41. sockaddr.sin_port = htons(PORT);
  42. sockaddr.sin_addr.s_addr = inet_addr("192.168.77.1"); //连接模拟TCP服务器
  43. iom.addEvent(sock_fd, IOManager::Event::READ, [](){
  44. KIT_LOG_INFO(g_logger) << "sock io read event!!";
  45. });
  46. iom.addEvent(sock_fd, IOManager::Event::WRITE, [](){
  47. KIT_LOG_INFO(g_logger) << "sock io wire event!!";
  48. });
  49. if(connect(sock_fd, (struct sockaddr*)&sockaddr, sizeof(struct sockaddr)) < 0)
  50. {
  51. std::cout << "connect error:" << strerror(errno) << std::endl;
  52. }
  53. //把socket 置为非阻塞
  54. fcntl(sock_fd, F_SETFL, O_NONBLOCK);
  55. //删除之前在句柄上设置的读事件
  56. iom.delEvent(sock_fd, IOManager::Event::READ);
  57. send(sock_fd, "hello", 5, 0);
  58. KIT_LOG_INFO(g_logger) << "test end";
  59. return 0;
  60. }

小BUG:~~epoll_ctl()~~返回错误:不被允许的参数。

原因:~~epoll_event~~中的~~events~~类型为~~uint32_t~~32位无符号整数,但是自定义的枚举类型默认为~~int~~32位有符号整数,在做位运算时会有负数/0值,导致传入到~~epoll~~中的时候出现问题。
image.png

  • C++11允许指定枚举类型:指定为8位无符号整数

image.png

修正后运行结果如下:

image.png


以上的小BUG的现象:确实是epoll_event传参不对导致的Invalid argument,即我们自定义枚举类型Event的问题。而真正原因并非枚举类型指定数据类型的问题,无需将其指定为uint无符号整型。真正导致BUG的原因是:

书写主动触发事件函数triggerEvent()时,需要把FdContext中的主动触发的事件去除,这个语句手误写错。导致不能正常将事件去除而重复触发/不触发/错误触发。
image.png

  • 正确写法:

image.png

3.2.3 取消句柄事件调用如下:

在触发写事件之前,将之前设置在句柄上的写事件取消,则打印中没有send也会触发回调。

  1. static Logger::ptr g_logger = KIT_LOG_ROOT();
  2. #define PORT 8080
  3. void func1()
  4. {
  5. KIT_LOG_INFO(g_logger) << "func1 work!!!!!!!!!!";
  6. }
  7. int main()
  8. {
  9. Thread::_setName("test");
  10. KIT_LOG_INFO(g_logger) << "test begin";
  11. IOManager iom;
  12. iom.schedule(&func1);
  13. /*设置一个socket IO 去测试异步触发*/
  14. int sock_fd = socket(AF_INET, SOCK_STREAM, 0);
  15. if(sock_fd < 0)
  16. {
  17. std::cout << "socket create error:" << strerror(errno);
  18. return 0;
  19. }
  20. struct sockaddr_in sockaddr;
  21. bzero(&sockaddr, sizeof(struct sockaddr_in));
  22. sockaddr.sin_family = AF_INET;
  23. sockaddr.sin_port = htons(PORT);
  24. sockaddr.sin_addr.s_addr = inet_addr("192.168.77.1"); //连接百度
  25. iom.addEvent(sock_fd, IOManager::Event::READ, [](){
  26. KIT_LOG_INFO(g_logger) << "sock io read event!!";
  27. });
  28. iom.addEvent(sock_fd, IOManager::Event::WRITE, [](){
  29. KIT_LOG_INFO(g_logger) << "sock io wire event!!";
  30. });
  31. if(connect(sock_fd, (struct sockaddr*)&sockaddr, sizeof(struct sockaddr)) < 0)
  32. {
  33. std::cout << "connect error:" << strerror(errno) << std::endl;
  34. }
  35. //把socket 置为非阻塞
  36. fcntl(sock_fd, F_SETFL, O_NONBLOCK);
  37. //取消设置好的的读事件回调
  38. iom.cancelEvent(sock_fd, IOManager::Event::READ);
  39. send(sock_fd, "hello", 5, 0);
  40. KIT_LOG_DEBUG(g_logger) << "send end";
  41. KIT_LOG_INFO(g_logger) << "test end";
  42. return 0;
  43. }

运行结果如下:

需要注意的是:由于我们这里自己手动添加了两个事件,因此需要把这两个事件都处理完,IO调度器才会停止退出。处理完的标志要么这个IO接口满足条件触发,要么手动将这个描述符上的事件取消。如果TCP服务器不发来消息,那么注册的读事件永远不触发,m_pendingEventCount待处理事件数量永远不为0,IOManager::stoppping()永远返回false
因此,这里的情况是读事件被取消;写事件是由send向服务器发送了数据满足条件触发。
image.png

3.3 多线程IO调度

3.3.1 主线程参与调度

在主线程中,构造IO调度器IOManager,并且通过该调度器添加一个任务schedule(&func1)
在主线程创建一个网络IOsocket,并在其描述符sock_fd上绑定了两个事件触发:读事件触发和写事件触发。用这个套接字去连接一个模拟的TCP服务器,并且send发送一条消息。

调用如下:

  1. static Logger::ptr g_logger = KIT_LOG_ROOT();
  2. #define PORT 8080
  3. void func1()
  4. {
  5. KIT_LOG_INFO(g_logger) << "func1 work!!!!!!!!!!";
  6. }
  7. int main()
  8. {
  9. Thread::_setName("test");
  10. KIT_LOG_INFO(g_logger) << "test begin";
  11. IOManager iom("test", 2, false);
  12. iom.schedule(&func1);
  13. /*设置一个socket IO 去测试异步触发*/
  14. int sock_fd = socket(AF_INET, SOCK_STREAM, 0);
  15. if(sock_fd < 0)
  16. {
  17. std::cout << "socket create error:" << strerror(errno);
  18. return 0;
  19. }
  20. struct sockaddr_in sockaddr;
  21. bzero(&sockaddr, sizeof(struct sockaddr_in));
  22. sockaddr.sin_family = AF_INET;
  23. sockaddr.sin_port = htons(PORT);
  24. sockaddr.sin_addr.s_addr = inet_addr("192.168.77.1"); //连接百度
  25. iom.addEvent(sock_fd, IOManager::Event::READ, [](){
  26. KIT_LOG_INFO(g_logger) << "sock io read event!!";
  27. });
  28. iom.addEvent(sock_fd, IOManager::Event::WRITE, [](){
  29. KIT_LOG_INFO(g_logger) << "sock io wire event!!";
  30. });
  31. if(connect(sock_fd, (struct sockaddr*)&sockaddr, sizeof(struct sockaddr)) < 0)
  32. {
  33. std::cout << "connect error:" << strerror(errno) << std::endl;
  34. }
  35. //把socket 置为非阻塞
  36. fcntl(sock_fd, F_SETFL, O_NONBLOCK);
  37. send(sock_fd, "hello", 5, 0);
  38. KIT_LOG_DEBUG(g_logger) << "send end";
  39. KIT_LOG_INFO(g_logger) << "test end";
  40. return 0;
  41. }

运行结果如下:

这里选择的是把主线程也纳入调度中,因此可以看到主线程也在”抢”任务执行。
image.png

3.3.2 主线程不参与调度

在主线程中,构造IO调度器IOManager,并且通过该调度器添加一个任务schedule(&func1)
在任务里(也就是把socket的创建放在子线程中去,谁抢到任务谁会send发送数据)创建一个网络IOsocket,并在其描述符sock_fd上绑定了两个事件触发:读事件触发和写事件触发。用这个套接字去连接一个模拟的TCP服务器,并且send发送一条消息。

注意:这里必须把addEvent套接字绑定事件触发,放到子线程中去执行,因为此时主线程已经不参与调度,不会初始化线程局部变量t_shceduelr,而是要等到子线程运行起来把调度器的指针设置好了才行。否则addEvent()函数中会拿到一个空指针赋值给FdContext,造成段错误很危险!

调用如下:

  1. static Logger::ptr g_logger = KIT_LOG_ROOT();
  2. #define PORT 8080
  3. void func1()
  4. {
  5. KIT_LOG_INFO(g_logger) << "func1 work!!!!!!!!!!";
  6. }
  7. void func2()
  8. {
  9. KIT_LOG_INFO(g_logger) << "func2 work!!!!!!!!!!";
  10. }
  11. void netio_test()
  12. {
  13. /*设置一个socket IO 去测试异步触发*/
  14. int sock_fd = socket(AF_INET, SOCK_STREAM, 0);
  15. if(sock_fd < 0)
  16. {
  17. std::cout << "socket create error:" << strerror(errno);
  18. return;
  19. }
  20. struct sockaddr_in sockaddr;
  21. bzero(&sockaddr, sizeof(struct sockaddr_in));
  22. sockaddr.sin_family = AF_INET;
  23. sockaddr.sin_port = htons(PORT);
  24. sockaddr.sin_addr.s_addr = inet_addr("192.168.77.1"); //连接百度
  25. IOManager::GetThis()->addEvent(sock_fd, IOManager::Event::READ, [](){
  26. KIT_LOG_INFO(g_logger) << "sock io read event!!";
  27. });
  28. IOManager::GetThis()->addEvent(sock_fd, IOManager::Event::WRITE, [](){
  29. KIT_LOG_INFO(g_logger) << "sock io wire event!!";
  30. });
  31. if(connect(sock_fd, (struct sockaddr*)&sockaddr, sizeof(struct sockaddr)) < 0)
  32. {
  33. std::cout << "connect error:" << strerror(errno) << std::endl;
  34. }
  35. //把socket 置为非阻塞
  36. fcntl(sock_fd, F_SETFL, O_NONBLOCK);
  37. send(sock_fd, "hello", 5, 0);
  38. KIT_LOG_DEBUG(g_logger) << "send end";
  39. }
  40. int main()
  41. {
  42. Thread::_setName("test");
  43. KIT_LOG_INFO(g_logger) << "test begin";
  44. IOManager iom("test", 3, false);
  45. iom.schedule(&func1);
  46. iom.schedule(&func2);
  47. iom.schedule(&netio_test);
  48. KIT_LOG_INFO(g_logger) << "test end";
  49. return 0;
  50. }

运行结果如下:

整个过程中主线程19955:test都没有参与到任务”竞争”与”执行”的过程中,仅仅是开辟了3个子协程(线程)然后等待他们全部执行完毕回收清理。
image.png
image.png

4. 定时器测试

4.1 定时器添加

调用如下:

  1. static Logger::ptr g_logger = KIT_LOG_NAME("root");
  2. int main()
  3. {
  4. KIT_LOG_DEBUG(g_logger) << "test begin";
  5. IOManager iom("test", 2);
  6. auto f1 = [=](){
  7. KIT_LOG_INFO(g_logger) << "hello timer!!";
  8. };
  9. auto f2 = [=](){
  10. KIT_LOG_INFO(g_logger) << "hello recurring timer!!";
  11. };
  12. //创建一个一次性定时器
  13. iom.addTimer(1000, f1, false);
  14. //创建一个循环定时器
  15. iom.addTimer(3000, f2, true);
  16. KIT_LOG_DEBUG(g_logger) << "test end";
  17. return 0;
  18. }

运行结果如下:

image.png

4.2 定时器取消

调用如下:

  1. static Logger::ptr g_logger = KIT_LOG_NAME("root");
  2. int main()
  3. {
  4. KIT_LOG_DEBUG(g_logger) << "test begin";
  5. IOManager iom("test", 2);
  6. //创建一个循环定时器 必须使用static
  7. static Timer::ptr t = iom.addTimer(1000, [](){
  8. static int i = 0;
  9. KIT_LOG_INFO(g_logger) << "hello timer!!, i = " << i;
  10. //循环调用三次后取消该定时器
  11. if(++i == 3)
  12. t->cancel();
  13. }, true);
  14. KIT_LOG_DEBUG(g_logger) << "test end";
  15. return 0;
  16. }

运行结果如下:

image.png

4.3 定时器重新设定间隔时间

调用如下:

  1. static Logger::ptr g_logger = KIT_LOG_NAME("root");
  2. int main()
  3. {
  4. KIT_LOG_DEBUG(g_logger) << "test begin";
  5. IOManager iom("test", 2);
  6. //创建一个循环定时器 必须使用static
  7. static Timer::ptr t = iom.addTimer(1000, [](){
  8. static int i = 0;
  9. KIT_LOG_INFO(g_logger) << "hello timer!!, i = " << i;
  10. //循环调用三次后 将该定时器间隔时间重置为3000ms
  11. if(++i == 3)
  12. t->reset(3000, true);
  13. }, true);
  14. KIT_LOG_DEBUG(g_logger) << "test end";
  15. return 0;
  16. }

运行结果如下:

image.png