1. 协程类封装测试
1.1 协程基本切换功能
目的:检查协程的基本切换逻辑是否正常。
1.1.1 调用如下:
void func_in()
{
KIT_LOG_INFO(KIT_LOG_ROOT()) << "func1 begin";
Coroutine::YieldToHold();
KIT_LOG_INFO(KIT_LOG_ROOT()) << "func1 end";
Coroutine::YieldToHold();
}
int main()
{
//这个语句的存在为了初始化母协程 有点瑕疵
Coroutine::GetThis();
KIT_LOG_INFO(KIT_LOG_ROOT()) << "coroutine test begin";
Coroutine::ptr cor1(new Coroutine(func_in));
cor1->swapIn();
KIT_LOG_INFO(KIT_LOG_ROOT()) << "1 swapIn after";
cor1->swapIn();
KIT_LOG_INFO(KIT_LOG_ROOT()) << "2 swapIn after";
KIT_LOG_INFO(KIT_LOG_ROOT()) << "coroutine test end";
return 0;
}
1.1.2 时序逻辑如下:
1.1.4 BUG:有两个协程对象,但是只有一个协程被析构,内存泄漏了
原因:在协程执行的回调函数里,最后手动的让子协程切换回主协程中,使其没有退出相应的作用域,触发不了智能指针引用计数减少,导致最后无法自动释放空间。
1.1.4.1 现象:
1.1.4.2 解决办法:从智能指针对象拿出裸指针来使用
追问:万一此时恰好智能指针管理的对象引用次数为0,不就释放空间了吗?操作了一个空指针nullptr必然段错误呀!
其实不然,cur
所指向的空间完全不可能在p->swapOut()
前就释放,因为在静态函数MainFunc()
中,获取的是当前正在运行的Coroutine对象
this指针的智能指针,这里get()
取出裸指针正是要减少一次对当前运行协程的”引用计数”,否则退出了该函数,智能指针cur
将永远留下来,造成内存泄漏。
区别:空指针能调用成员函数的情况。如下一个空的类对象指针依然能调成员函数: ```cpp class A { public:
void func() {
cout << 6666 << endl;
}
private: int a; };
int main() { A *a = nullptr; a->func();
return 0;
}

<a name="gBdIN"></a>
## 1.2 多线程下协程的切换
<a name="RiRjO"></a>
### 1.2.1 调用如下:
```cpp
void test_coroutine()
{
KIT_LOG_INFO(KIT_LOG_ROOT()) << "sub_thread test begin";
//初始化母协程
Coroutine::GetThis();
//创建一个子协程
Coroutine::ptr cor1(new Coroutine(func_in));
cor1->swapIn();
KIT_LOG_INFO(KIT_LOG_ROOT()) << "1 swapIn after";
cor1->swapIn();
KIT_LOG_INFO(KIT_LOG_ROOT()) << "2 swapIn after";
KIT_LOG_INFO(KIT_LOG_ROOT()) << "sub_thread test end";
}
int main()
{
Thread::_setName("main thread");
vector<Thread::ptr> mv;
//启动3个线程 每个线程上有2个协程:1个母协程 1个子协程
for(int i = 0; i < 3;i++)
{
Thread::ptr p(new Thread(&test_coroutine, "t_" + to_string(i)));
mv.emplace_back(p);
}
for(auto &x : mv)
x->join();
return 0;
}
1.2.2 运行结果如下:
2. 协程调度测试
2.1 调度器的创建、开启、停止
2.1.1 调用如下:
int main()
{
//将调度器命名为"test"
Scheduler sc("test");
//开启调度器
sc.start();
//结束调度器
sc.stop();
return 0;
}
2.1.2 BUG运行结果如下
可以看到触发了一个异常:bad_function_call
这个异常是因为函数包装器function<>
的内容是空nullptr
而我们发现它被企图调用。
说明此时的问题:主线程中的0号协程类对象中的function<>
没有被赋值内容,0号协程不应该会去执行MainFunc()
,进一步说明调度逻辑有问题。
2.1.3 BUG一番修改后,再一次运行结果如下:
发现代码跳转陷入到一个循坏中,发生了隐性的一个代码逻辑成环的一个情况。
- 以上结果的代码的逻辑如下:
2.1.4 导致死循环的问题代码块:
**swapin()**
函数:(init母协程)init_cor_sp->m_ctx
————-切换————->(当前协程)this->m_ctx
```cpp void Coroutine::swapIn() { SetThis(this); //没在运行态才能 调入运行 KIT_ASSERT(m_state != State::EXEC);m_state = State::EXEC; if(swapcontext(&init_cor_sp->m_ctx, &m_ctx) < 0) {
KIT_LOG_ERROR(g_logger) << "swapIn: swapcontext error";
KIT_ASSERT2(false, "swapcontext error");
}
}
- `**swapout()**`**函数分两种情况:**
- 当前协程**不是**调度器协程:
`(当前协程)this->m_ctx`---------切换--------->`(调度器协程)Scheduler::GetMainCor()->m_ctx`
- 当前协程**是**调度器协程:
`(当前协程)this->m_ctx`---------切换--------->`(init母协程)init_cor_sp->m_ctx`
```cpp
void Coroutine::swapOut()
{
if(this != Scheduler::GetMainCor())
{
SetThis(Scheduler::GetMainCor());
if(swapcontext(&m_ctx, &Scheduler::GetMainCor()->m_ctx) < 0)
{
KIT_LOG_ERROR(g_logger) << "swapOut: swapcontext error";
KIT_ASSERT2(false, "swapcontext error");
}
}
else
{
SetThis(init_cor_sp.get());
if(swapcontext(&m_ctx, &init_cor_sp->m_ctx) < 0)
{
KIT_LOG_ERROR(g_logger) << "swapOut: swapcontext error";
KIT_ASSERT2(false, "swapcontext error");
}
}
}
成因:swapin()
和swapout()
切换代码序列时存储切出位置没有对应正确,从run()
切换swapIn()
要保存的m_ctx
本应该在Scheduler对象
中但跑到了init_cor_sp
中;从MainFunc()----idle()
切换swapOut()
选择了从Scheduler对象
中切回恢复,但此时里面保存的根本不是上一次正确的应该切回的代码序列。
解决要点:让切出、切回的代码序列对应起来,不要把不同代码序列的切换混在一起即可。
2.1.5 将swapIn()
函数修改后
//从调度器 切换到 到目标代码序列
void Coroutine::swapIn()
{
SetThis(this);
//没在运行态才能 调入运行
KIT_ASSERT(m_state == State::INIT || m_state == State::HOLD);
m_state = State::EXEC;
// if(swapcontext(&init_cor_sp->m_ctx, &m_ctx) < 0)
// {
// KIT_LOG_ERROR(g_logger) << "swapIn: swapcontext error";
// KIT_ASSERT2(false, "swapcontext error");
// }
if(swapcontext(&Scheduler::GetMainCor()->m_ctx, &m_ctx) < 0)
{
KIT_LOG_ERROR(g_logger) << "swapIn: swapcontext error";
KIT_ASSERT2(false, "swapcontext error");
}
}
运行结果如下:已经基本正确
2.2 调度器单线程任务添加
2.2.1 调用如下:
static Logger::ptr g_logger = KIT_LOG_ROOT();
void func1()
{
KIT_LOG_DEBUG(g_logger) << "func1 work!!!!!!!!!";
}
void func2()
{
KIT_LOG_DEBUG(g_logger) << "func2 work!!!!!!!!!";
}
int main()
{
//只有调度器这一个主线程在工作
Scheduler sc("test");
sc.schedule(&func1); //以函数形式入队
Coroutine::ptr c1(new Coroutine(&func2)); //创建协程
sc.schedule(c1); //以协程形式入队
//开启调度器
sc.start();
//停止调度器
sc.stop();
KIT_LOG_INFO(g_logger) << "test over";
return 0;
}
运行结果如下:
2.2.2 小BUG:在start()
之后添加任务,无法调度
原因:添加了调试debug信息后,发现start()
后,直接从init协程
跳转到了调度协程的run()
中,当run()
函数都执行完毕(此时调度完毕的标志是idle()
返回),这时任务还没有来得及添加进队列,调度过程却已经结束了,等任务加入到队列也没有意义了。
static Logger::ptr g_logger = KIT_LOG_ROOT();
void func1()
{
KIT_LOG_DEBUG(g_logger) << "func1 work!!!!!!!!!";
}
void func2()
{
KIT_LOG_DEBUG(g_logger) << "func2 work!!!!!!!!!";
}
int main()
{
Scheduler sc("test");
sc.start();
/*在调度器开启后再添加任务*/
sc.schedule(&func1); //以函数形式入队
Coroutine::ptr c1(new Coroutine(&func2)); //创建协程
sc.schedule(c1); //以协程形式入队
sc.stop();
KIT_LOG_INFO(g_logger) << "test over";
return 0;
}
2.2.3 改进实现功能:不管任务添加时机在start
前还是后都要能够调度到任务。
方案1:将
run()
开始执行的时间延后到stop()
里面,stop()
之前添加的任务都能调度方案2:是一个典型的生产者-消费者的模型。可以考虑使用信号量+互斥锁
2.3 调度器多线程任务添加
2.3.1 调用如下:
static Logger::ptr g_logger = KIT_LOG_ROOT();
void func1()
{
KIT_LOG_DEBUG(g_logger) << "func1 work!!!!!!!!!";
}
void func2()
{
KIT_LOG_DEBUG(g_logger) << "func2 work!!!!!!!!!";
}
void func3()
{
KIT_LOG_DEBUG(g_logger) << "func3 work!!!!!!!!!";
}
int main()
{
//创建额外的两个线程, 算上调度器的线程 共3个
Scheduler sc("test", 3);
/*调度器开启前加任务*/
sc.schedule(&func3);
sc.start();
/*调度器开启后加任务*/
sc.schedule(&func1);
Coroutine::ptr c1(new Coroutine(&func2));
sc.schedule(c1);
sc.stop();
KIT_LOG_INFO(g_logger) << "test over";
return 0;
}
2.3.2 运行结果如下:
可以看到3个任务,在不同的协程中运行。
2.3.3 改动:在stop()
中对子线程进行回收
这里这么写的作用:
- 保证工作队列的线程安全
快速将工作队列清空 将线程资源拿到这来进行清理,等待子线程执行完毕
void Scheduler::stop()
{
...
std::vector<Thread::ptr> threads;
{
MutexType::Lock lock(m_mutex);
threads.swap(m_threads);
}
for(auto &x : threads)
{
x->join();
}
...
}
2.4 调度器多线程将任务延迟添加
循环的将任务延迟添加到任务队列中。
- 不让
**idle()**
函数简单的结束,否则线程空转时间太短,观察不到切换:
只要调度器不停止,idle()
空转函数就不停止,并且从idle_coroutine
协程切回的时候,让协程的状态置为HOLD
挂起状态,并非TERM
终止状态。
//协程空转函数
bool Scheduler::idle()
{
while(!stopping())
{
KIT_LOG_INFO(g_logger) << "coroutine idle";
//当前协程中途让出执行时间
Coroutine::YieldToHold();
}
/*注意:退出循坏意味着 MainFunc/CallMainFunc 已经结束
* 当前协程会被置为TERM状态
*/
return 0;
}
2.4.1 调用如下
static Logger::ptr g_logger = KIT_LOG_ROOT();
void func1()
{
KIT_LOG_DEBUG(g_logger) << "func1 work!!!!!!!!!";
//抢到该任务的线程小睡一下
usleep(100);
//睡眠结束 往任务队列添加5个任务 共执行6次func1()任务
static int i = 5;
while(--i >= 0)
Scheduler::GetThis()->schedule(&func1);
}
int main()
{
//额外开2个线程, 算上调度器主线程 共3个线程
Scheduler sc("test", 3);
//调度器开启
sc.start();
//添加任务
sc.schedule(&func1);
//调度器停止
sc.stop();
KIT_LOG_INFO(g_logger) << "test over";
return 0;
}
2.4.2 运行结果如下:
从结果来看,子线程256192(test_0)
没有抢到任务,在执行”空转”idle()
,执行中途切出Coroutine::YieldToHold()
。切换顺序:test_0:0号协程——->test_0:2号协程——>test_0:0号协程
但是回到0号协程时候发生段错误。
2.4.3 BUG操作了一个空指针nullptr
使用GDB调试工具追踪到,Coroutine::setState()
函数附近有操作空指针。由于此时在idle()
函数中使用了Coroutine::YieldToHold()
已经把协程状态置为了HOLD
,会进入条件判断中,操作了一个空结构体CoroutineObject co
,co.cor->setState()
,
2.4.4 修正后运行结果如下:
2.4.5 时序逻辑如下:
主线程259594(test) | ||
---|---|---|
调度器构造Scheduler(): 1. 创建 init协程 :0号协程1. 创建 调度协程 :1号协程 |
||
调度器.start() :初始化子线程 |
子线程259595(test_0) | 子线程259596(test_1) |
①开始执行run() 函数 |
||
构造空转协程:2号协程 | ||
添加一个任务schedule() |
||
执行调度器.stop() :开启调度器的调度 call() |
||
转到1号协程: ①执行 run() 函数 |
||
构造空转协程:3号协程 | ||
抢到一个任务:函数形式func (第一个任务) |
||
为函数构造一个协程以供调度:4号协程 | ||
swapIn() :跳转任务中去执行 |
||
usleep(100) 小睡一下 |
||
没有抢到任务 (唯一的任务被主线程拿走) |
||
swapIn() :跳转到idle() 中 |
||
YieldToHold() :执行中切出2号协程被挂起 HOLD |
||
②切回执行run() 函数 |
||
……….. | ||
没有抢到任务 (唯一的任务被主线程拿走) |
||
swapIn() :跳转到idle() 中 |
||
YieldToHold() :执行中切出2号协程被挂起 HOLD |
||
③切回执行run() 函数 |
||
……….. | ||
没有抢到任务 (唯一的任务被主线程拿走) |
||
swapIn() :跳转到idle() 中 |
||
YieldToHold() :执行中切出2号协程被挂起 HOLD |
||
④切回执行run() 函数 |
||
……… | ||
没有抢到任务 (唯一的任务被主线程拿走) |
||
睡眠结束 向任务队列添加5个任务 之前任务队列是空的,会触发一次唤醒 tickle() |
||
抢到一个任务:函数形式func1 (第二个任务) |
||
复用4号协程以供调度 | ||
swapIn() :跳转到任务去执行 |
||
任务执行结束,swapOut() ②切回执行 run() 函数 |
||
swapIn() :跳转到idle() 中 |
||
YieldToHold() :执行中切出2号协程被挂起 HOLD |
||
⑤切回执行run() 函数 |
||
抢到一个任务:函数形式func1 (第三个任务) |
||
为函数构造一个协程以供调度:5号协程 | ||
swapIn() :跳转到任务去执行 |
||
任务执行结束,swapOut() ⑥切回执行 run() 函数 |
||
①开始执行run() 函数 |
||
抢到一个任务:函数形式func1 (第四个任务) |
||
构造空转协程:6号协程 | ||
复用4号协程以供调度 | ||
swapIn() :跳转到任务去执行 |
||
任务执行结束,swapOut() ②切回执行 run() 函数 |
||
抢到一个任务:函数形式func1 (第五个任务) |
||
为函数构造一个协程以供调度:7号协程 | ||
swapIn() :跳转到任务去执行 |
||
任务执行结束,swapOut() ②切回执行 run() 函数 |
||
抢到一个任务:函数形式func1 (第六个任务) |
||
复用5号协程以供调度 | ||
swapIn() :跳转到任务去执行 |
||
任务执行结束,swapOut() ⑥切回执行 run() 函数 |
||
…….. | ||
idle_coroutine TERM! 退出调度循环,线程退出 7号协程析构 6号协程析构 |
||
…….. | ||
…….. | ||
idle_coroutine TERM! 退出调度循环 4号协程析构 3号协程析构 |
||
返回到stop() 函数中回收子线程资源 join() |
||
…….. | ||
idle_coroutine TERM! 退出调度循环,线程退出 5号协程析构 2号协程析构 |
||
资源回收结束返回main() 函数 |
||
程序结束 | ||
1号协程析构 0号协程析构 |
2.5 调度器多线程给任务指定线程执行
2.5.1 调用如下:
在func1()
中添加任务时指定当前运行线程为下一次的执行线程
static Logger::ptr g_logger = KIT_LOG_ROOT();
void func1()
{
KIT_LOG_DEBUG(g_logger) << "func1 work!!!!!!!!!";
sleep(1);
static int i = 5;
if(--i >= 0)
Scheduler::GetThis()->schedule(&func1, GetThreadId());
}
int main()
{
Scheduler sc("test", 3, false);
sc.start();
sc.schedule(&func1);
sc.stop();
KIT_LOG_INFO(g_logger) << "test over";
return 0;
}
2.5.2 运行结果如下:
每一个任务都指定了运行的线程,这样就无法发生线程的切换。每一个任务都是由262332(test_1)
线程去执行完成的。
3. IO调度测试
3.1 协程调度
目的:看继承之后的子类IOManager
是否能正常调度,保证原有Scheduler
的功能不被破坏
3.1.1 调用如下:
static Logger::ptr g_logger = KIT_LOG_ROOT();
void func1()
{
KIT_LOG_INFO(g_logger) << "func1 work!!!!!!!!!!";
}
void func2()
{
KIT_LOG_INFO(g_logger) << "func2 work!!!!!!!!!!";
}
int main()
{
Thread::_setName("test");
//创建IO调度器
KIT_LOG_INFO(g_logger) << "test begin";
IOManager iom;
//添加函数任务
iom.schedule(&func1);
//添加协程任务
Coroutine::ptr cor(new Coroutine(&func2));
iom.schedule(cor);
KIT_LOG_INFO(g_logger) << "test end";
return 0;
}
3.1.2 运行结果:
从结果来看,调度器的开启start()
、停止stop()
、添加/调度任务功能都是正常的。
3.2 单线程IO调度
目的:让IO调度器操作句柄以及能够触发句柄对应事件上的回调函数
方法:创建一个socket
套接字去连接一个TCP服务器(工具伪装的)
- 技巧点:由于要涉及网络IO,使用了一个网络传输的调试小工具,可以伪装TCP服务器/TCP客户端/UDP终端,方便调试IO的读写事件触发
3.2.1 添加句柄调用如下:
- 把这个IO加入到
IOManager
中去。 - 为这个
socket
套接字绑定两个回调函数,分别在这个IO上出现EPOLLIN
读事件和EPOLLOUT
写事件时自动触发执行。 ```cppdefine PORT 8080 //服务器端口号
//任务 void func1() { KIT_LOG_INFO(g_logger) << “func1 work!!!!!!!!!!”; }
int main() { Thread::_setName(“test”); KIT_LOG_INFO(g_logger) << “test begin”; IOManager iom;
iom.schedule(&func1);
/*设置一个socket IO 去测试异步触发*/
int sock_fd = socket(AF_INET, SOCK_STREAM, 0);
if(sock_fd < 0)
{
std::cout << "socket create error:" << strerror(errno);
return 0;
}
//固定的socket编程
struct sockaddr_in sockaddr;
bzero(&sockaddr, sizeof(struct sockaddr_in));
sockaddr.sin_family = AF_INET;
sockaddr.sin_port = htons(PORT);
sockaddr.sin_addr.s_addr = inet_addr("192.168.77.1"); //连接模拟TCP服务器
//为sock_fd 添加读事件回调
iom.addEvent(sock_fd, IOManager::Event::READ, [](){
KIT_LOG_INFO(g_logger) << "sock io read event!!";
});
//为sock_fd 添加写事件回调
iom.addEvent(sock_fd, IOManager::Event::WRITE, [](){
KIT_LOG_INFO(g_logger) << "sock io wire event!!";
});
//连接服务器
if(connect(sock_fd, (struct sockaddr*)&sockaddr, sizeof(struct sockaddr)) < 0)
{
std::cout << "connect error:" << strerror(errno) << std::endl;
}
//把socket 置为非阻塞
fcntl(sock_fd, F_SETFL, O_NONBLOCK);
//发送一段数据 触发写事件
send(sock_fd, "hello", 5, 0);
KIT_LOG_INFO(g_logger) << "test end";
return 0;
}
<a name="beebd"></a>
#### BUG:运行后没有发现触发了写事件

- 读事件被触发,经过`send`之后,TCP服务器也收到了发送的数据,TCP连接没有问题,数据收发正常:使用 `netstat -ntlap | grep test_io`查看当前进程的IO连接情况:成功建立。

- 现象:写事件绑定的回调没有被执行。并且线程在`idle()`函数中陷入`epoll_wait()`函数死循环。由于添加了写事件回调,而该回调没有被正常消耗执行掉,导致`m_pendingEventCount`待处理事件数量没有为0,`IOManager::stopping()`终止函数无法终止,调度器无法关闭。

- 原因:将自定义的事件`Event`和现有`epoll_event`混为使用,导致了放入到`epoll`中的事件不能正确触发:

- 要么将这二者统一,要么将二者完全分离。选择前者,将其统一。其他使用到的地方,也照做

<a name="roulw"></a>
#### 修正后运行结果如下:

<a name="Ns5ag"></a>
### 3.2.2 删除句柄调用如下:
在触发写事件之前,将之前设置在句柄上的读事件删除,使用TCP服务器发送消息触发IO读事件,但打印中不会出现读事件对应的回调函数。
```cpp
static Logger::ptr g_logger = KIT_LOG_ROOT();
#define PORT 8080
void func1()
{
KIT_LOG_INFO(g_logger) << "func1 work!!!!!!!!!!";
}
int main()
{
Thread::_setName("test");
KIT_LOG_INFO(g_logger) << "test begin";
IOManager iom;
iom.schedule(&func1);
/*设置一个socket IO 去测试异步触发*/
int sock_fd = socket(AF_INET, SOCK_STREAM, 0);
if(sock_fd < 0)
{
std::cout << "socket create error:" << strerror(errno);
return 0;
}
struct sockaddr_in sockaddr;
bzero(&sockaddr, sizeof(struct sockaddr_in));
sockaddr.sin_family = AF_INET;
sockaddr.sin_port = htons(PORT);
sockaddr.sin_addr.s_addr = inet_addr("192.168.77.1"); //连接模拟TCP服务器
iom.addEvent(sock_fd, IOManager::Event::READ, [](){
KIT_LOG_INFO(g_logger) << "sock io read event!!";
});
iom.addEvent(sock_fd, IOManager::Event::WRITE, [](){
KIT_LOG_INFO(g_logger) << "sock io wire event!!";
});
if(connect(sock_fd, (struct sockaddr*)&sockaddr, sizeof(struct sockaddr)) < 0)
{
std::cout << "connect error:" << strerror(errno) << std::endl;
}
//把socket 置为非阻塞
fcntl(sock_fd, F_SETFL, O_NONBLOCK);
//删除之前在句柄上设置的读事件
iom.delEvent(sock_fd, IOManager::Event::READ);
send(sock_fd, "hello", 5, 0);
KIT_LOG_INFO(g_logger) << "test end";
return 0;
}
小BUG:~~epoll_ctl()~~
返回错误:不被允许的参数。
原因:~~epoll_event~~
中的~~events~~
类型为~~uint32_t~~
32位无符号整数,但是自定义的枚举类型默认为~~int~~
32位有符号整数,在做位运算时会有负数/0值,导致传入到~~epoll~~
中的时候出现问题。
C++11允许指定枚举类型:指定为8位无符号整数
修正后运行结果如下:
以上的小BUG的现象:确实是epoll_event
传参不对导致的Invalid argument
,即我们自定义枚举类型Event
的问题。而真正原因并非枚举类型指定数据类型的问题,无需将其指定为uint
无符号整型。真正导致BUG的原因是:
书写主动触发事件函数triggerEvent()
时,需要把FdContext
中的主动触发的事件去除,这个语句手误写错。导致不能正常将事件去除而重复触发/不触发/错误触发。
- 正确写法:
3.2.3 取消句柄事件调用如下:
在触发写事件之前,将之前设置在句柄上的写事件取消,则打印中没有send
也会触发回调。
static Logger::ptr g_logger = KIT_LOG_ROOT();
#define PORT 8080
void func1()
{
KIT_LOG_INFO(g_logger) << "func1 work!!!!!!!!!!";
}
int main()
{
Thread::_setName("test");
KIT_LOG_INFO(g_logger) << "test begin";
IOManager iom;
iom.schedule(&func1);
/*设置一个socket IO 去测试异步触发*/
int sock_fd = socket(AF_INET, SOCK_STREAM, 0);
if(sock_fd < 0)
{
std::cout << "socket create error:" << strerror(errno);
return 0;
}
struct sockaddr_in sockaddr;
bzero(&sockaddr, sizeof(struct sockaddr_in));
sockaddr.sin_family = AF_INET;
sockaddr.sin_port = htons(PORT);
sockaddr.sin_addr.s_addr = inet_addr("192.168.77.1"); //连接百度
iom.addEvent(sock_fd, IOManager::Event::READ, [](){
KIT_LOG_INFO(g_logger) << "sock io read event!!";
});
iom.addEvent(sock_fd, IOManager::Event::WRITE, [](){
KIT_LOG_INFO(g_logger) << "sock io wire event!!";
});
if(connect(sock_fd, (struct sockaddr*)&sockaddr, sizeof(struct sockaddr)) < 0)
{
std::cout << "connect error:" << strerror(errno) << std::endl;
}
//把socket 置为非阻塞
fcntl(sock_fd, F_SETFL, O_NONBLOCK);
//取消设置好的的读事件回调
iom.cancelEvent(sock_fd, IOManager::Event::READ);
send(sock_fd, "hello", 5, 0);
KIT_LOG_DEBUG(g_logger) << "send end";
KIT_LOG_INFO(g_logger) << "test end";
return 0;
}
运行结果如下:
需要注意的是:由于我们这里自己手动添加了两个事件,因此需要把这两个事件都处理完,IO调度器才会停止退出。处理完的标志要么这个IO接口满足条件触发,要么手动将这个描述符上的事件取消。如果TCP服务器不发来消息,那么注册的读事件永远不触发,m_pendingEventCount
待处理事件数量永远不为0,IOManager::stoppping()
永远返回false
。
因此,这里的情况是读事件被取消;写事件是由send
向服务器发送了数据满足条件触发。
3.3 多线程IO调度
3.3.1 主线程参与调度
在主线程中,构造IO调度器IOManager
,并且通过该调度器添加一个任务schedule(&func1)
。
在主线程创建一个网络IOsocket
,并在其描述符sock_fd
上绑定了两个事件触发:读事件触发和写事件触发。用这个套接字去连接一个模拟的TCP服务器,并且send
发送一条消息。
调用如下:
static Logger::ptr g_logger = KIT_LOG_ROOT();
#define PORT 8080
void func1()
{
KIT_LOG_INFO(g_logger) << "func1 work!!!!!!!!!!";
}
int main()
{
Thread::_setName("test");
KIT_LOG_INFO(g_logger) << "test begin";
IOManager iom("test", 2, false);
iom.schedule(&func1);
/*设置一个socket IO 去测试异步触发*/
int sock_fd = socket(AF_INET, SOCK_STREAM, 0);
if(sock_fd < 0)
{
std::cout << "socket create error:" << strerror(errno);
return 0;
}
struct sockaddr_in sockaddr;
bzero(&sockaddr, sizeof(struct sockaddr_in));
sockaddr.sin_family = AF_INET;
sockaddr.sin_port = htons(PORT);
sockaddr.sin_addr.s_addr = inet_addr("192.168.77.1"); //连接百度
iom.addEvent(sock_fd, IOManager::Event::READ, [](){
KIT_LOG_INFO(g_logger) << "sock io read event!!";
});
iom.addEvent(sock_fd, IOManager::Event::WRITE, [](){
KIT_LOG_INFO(g_logger) << "sock io wire event!!";
});
if(connect(sock_fd, (struct sockaddr*)&sockaddr, sizeof(struct sockaddr)) < 0)
{
std::cout << "connect error:" << strerror(errno) << std::endl;
}
//把socket 置为非阻塞
fcntl(sock_fd, F_SETFL, O_NONBLOCK);
send(sock_fd, "hello", 5, 0);
KIT_LOG_DEBUG(g_logger) << "send end";
KIT_LOG_INFO(g_logger) << "test end";
return 0;
}
运行结果如下:
这里选择的是把主线程也纳入调度中,因此可以看到主线程也在”抢”任务执行。
3.3.2 主线程不参与调度
在主线程中,构造IO调度器IOManager
,并且通过该调度器添加一个任务schedule(&func1)
。
在任务里(也就是把socket
的创建放在子线程中去,谁抢到任务谁会send
发送数据)创建一个网络IOsocket
,并在其描述符sock_fd
上绑定了两个事件触发:读事件触发和写事件触发。用这个套接字去连接一个模拟的TCP服务器,并且send
发送一条消息。
注意:这里必须把addEvent
套接字绑定事件触发,放到子线程中去执行,因为此时主线程已经不参与调度,不会初始化线程局部变量t_shceduelr
,而是要等到子线程运行起来把调度器的指针设置好了才行。否则addEvent()
函数中会拿到一个空指针赋值给FdContext
,造成段错误很危险!
调用如下:
static Logger::ptr g_logger = KIT_LOG_ROOT();
#define PORT 8080
void func1()
{
KIT_LOG_INFO(g_logger) << "func1 work!!!!!!!!!!";
}
void func2()
{
KIT_LOG_INFO(g_logger) << "func2 work!!!!!!!!!!";
}
void netio_test()
{
/*设置一个socket IO 去测试异步触发*/
int sock_fd = socket(AF_INET, SOCK_STREAM, 0);
if(sock_fd < 0)
{
std::cout << "socket create error:" << strerror(errno);
return;
}
struct sockaddr_in sockaddr;
bzero(&sockaddr, sizeof(struct sockaddr_in));
sockaddr.sin_family = AF_INET;
sockaddr.sin_port = htons(PORT);
sockaddr.sin_addr.s_addr = inet_addr("192.168.77.1"); //连接百度
IOManager::GetThis()->addEvent(sock_fd, IOManager::Event::READ, [](){
KIT_LOG_INFO(g_logger) << "sock io read event!!";
});
IOManager::GetThis()->addEvent(sock_fd, IOManager::Event::WRITE, [](){
KIT_LOG_INFO(g_logger) << "sock io wire event!!";
});
if(connect(sock_fd, (struct sockaddr*)&sockaddr, sizeof(struct sockaddr)) < 0)
{
std::cout << "connect error:" << strerror(errno) << std::endl;
}
//把socket 置为非阻塞
fcntl(sock_fd, F_SETFL, O_NONBLOCK);
send(sock_fd, "hello", 5, 0);
KIT_LOG_DEBUG(g_logger) << "send end";
}
int main()
{
Thread::_setName("test");
KIT_LOG_INFO(g_logger) << "test begin";
IOManager iom("test", 3, false);
iom.schedule(&func1);
iom.schedule(&func2);
iom.schedule(&netio_test);
KIT_LOG_INFO(g_logger) << "test end";
return 0;
}
运行结果如下:
整个过程中主线程19955:test
都没有参与到任务”竞争”与”执行”的过程中,仅仅是开辟了3个子协程(线程)然后等待他们全部执行完毕回收清理。
4. 定时器测试
4.1 定时器添加
调用如下:
static Logger::ptr g_logger = KIT_LOG_NAME("root");
int main()
{
KIT_LOG_DEBUG(g_logger) << "test begin";
IOManager iom("test", 2);
auto f1 = [=](){
KIT_LOG_INFO(g_logger) << "hello timer!!";
};
auto f2 = [=](){
KIT_LOG_INFO(g_logger) << "hello recurring timer!!";
};
//创建一个一次性定时器
iom.addTimer(1000, f1, false);
//创建一个循环定时器
iom.addTimer(3000, f2, true);
KIT_LOG_DEBUG(g_logger) << "test end";
return 0;
}
运行结果如下:
4.2 定时器取消
调用如下:
static Logger::ptr g_logger = KIT_LOG_NAME("root");
int main()
{
KIT_LOG_DEBUG(g_logger) << "test begin";
IOManager iom("test", 2);
//创建一个循环定时器 必须使用static
static Timer::ptr t = iom.addTimer(1000, [](){
static int i = 0;
KIT_LOG_INFO(g_logger) << "hello timer!!, i = " << i;
//循环调用三次后取消该定时器
if(++i == 3)
t->cancel();
}, true);
KIT_LOG_DEBUG(g_logger) << "test end";
return 0;
}
运行结果如下:
4.3 定时器重新设定间隔时间
调用如下:
static Logger::ptr g_logger = KIT_LOG_NAME("root");
int main()
{
KIT_LOG_DEBUG(g_logger) << "test begin";
IOManager iom("test", 2);
//创建一个循环定时器 必须使用static
static Timer::ptr t = iom.addTimer(1000, [](){
static int i = 0;
KIT_LOG_INFO(g_logger) << "hello timer!!, i = " << i;
//循环调用三次后 将该定时器间隔时间重置为3000ms
if(++i == 3)
t->reset(3000, true);
}, true);
KIT_LOG_DEBUG(g_logger) << "test end";
return 0;
}