1. IO协程调度封装
构思
使用协程开发(一)中已经完成的协程调度模块,封装出具体的调度子类IOManager
。不仅有基本的协程调度分配任务、执行任务的功能,而且由于是服务器框架,一定绕不开网络IO的管理,用于管理各种IO接口以及上面发生的读写事件,根据读写事件触发相应的异步回调函数(自动装入任务队列执行)
· 类关系
基本IO管理思路
1.1 成员变量
/**
* @brief IO调度器类
*/
class IOManager: public Scheduler, public TimerManager
{
...
...
public:
/**
* @brief 事件枚举类型 直接对标epoll里的事件赋值
*/
enum Event{
NONE = 0x0, //无事件
READ = 0x1, //读事件 EPOLLIN
WRITE = 0x4 //写事件 EPOLLOUT
};
private:
///epoll句柄
int m_epfd;
///管道描述符 作为唤醒工具
int m_tickleFds[2];
///待处理事件数量
std::atomic<size_t> m_pendingEventCount = {0};
///读写锁
MutexType m_mutex;
///句柄对象指针数组
std::vector<FdContext*> m_fdContexts;
};
* 封装套接字句柄对象+事件对象
目的:方便归纳句柄、事件所具有的属性。句柄带有事件,事件依附于句柄。只有句柄上才有事件触发
注意:和HOOK模块的文件句柄类做一个区分,这里的套接字句柄专门针对套接字上的事件做回调管理。
class IOManager: public Scheduler
{
...
...
private:
/**
* @brief 句柄对象结构体
*/
struct FdContext
{
typedef Mutex MutexType;
/**
* @brief 事件对象结构体
*/
struct EventContext
{
///被调度的调度器
Scheduler *scheduler = nullptr;
//事件绑定的协程
Coroutine::ptr coroutine;
//事件绑定的函数
std::function<void()> cb;
};
///句柄/文件描述符
int fd = 0;
///句柄上的读事件对象
EventContext read_event;
///句柄上的写事件对象
EventContext write_event;
///句柄上注册好的事件
Event events = NONE;
///互斥锁
MutexType mutex;
/**
* @brief 获取句柄对象上的事件对象
* @param[in] event 读/写事件
* @return EventContext&
*/
EventContext& getEventContext(Event event);
/**
* @brief 清空句柄对象上的事件对象
* @param[in] event_ctx 读/写事件对象
*/
void resetEventContext(struct EventContext& event_ctx);
/**
* @brief 主动触发事件,执行事件对象上的回调函数
* @param[in] event 读/写事件
*/
void triggerEvent(Event event);
};
...
...
}
接口
1). getEventContext()
功能:获取句柄对象上的事件对象
/**
* @brief 获取句柄对象上的事件对象
* @param[in] event 读/写事件
* @return EventContext&
*/
EventContext& getEventContext(Event event);
IOManager::FdContext::EventContext& IOManager::FdContext::getEventContext(IOManager::Event event)
{
switch(event)
{
case IOManager::READ: return read_event;break;
case IOManager::WRITE: return write_event;break;
default:
KIT_ASSERT2(false, "getContext error");
}
throw std::invalid_argument("getContext invalid event!!");
}
2). resetEventContext()
功能:清空句柄对象上的事件对象
/**
* @brief 清空句柄对象上的事件对象
* @param[in] event_ctx 读/写事件对象
*/
void resetEventContext(struct EventContext& event_ctx);
3). triggerEvent()
功能:
/**
* @brief 主动触发事件,执行事件对象上的回调函数
* @param[in] event 读/写事件
*/
void triggerEvent(Event event);
void IOManager::FdContext::triggerEvent(IOManager::Event event)
{
KIT_ASSERT(events & event);
//把触发后的事件去除掉
events = (Event)(events & ~event);
EventContext& ctx = getEventContext(event);
KIT_ASSERT(ctx.cb || ctx.coroutine);
if(ctx.cb)
{
ctx.scheduler->schedule(&ctx.cb);
}
else if(ctx.coroutine)
{
ctx.scheduler->schedule(&ctx.coroutine);
}
ctx.scheduler = nullptr;
}
1.2 接口
1.2.1 构造函数
/**
* @brief IO调度器类构造函数
* @param[in] name 调度器名称
* @param[in] threads_size 初始线程数量
* @param[in] use_caller 当前线程是否作为调度线程
*/
IOManager(const std::string& name = "", size_t threads_size = 1, bool use_caller = true);
IOManager::IOManager(const std::string& name, size_t threads_size, bool use_caller)
:Scheduler(name, threads_size, use_caller)
{
//创建epoll句柄
m_epfd = epoll_create(1);
if(m_epfd < 0)
{
KIT_LOG_ERROR(g_logger) << "IOManager: epoll_create error";
KIT_ASSERT2(false, "epoll_create error");
}
//创建管道句柄
int ret = pipe(m_tickleFds);
if(ret < 0)
{
KIT_LOG_ERROR(g_logger) << "IOManager: pipe create error";
KIT_ASSERT2(false, "pipe create error");
}
//初始化epoll事件
struct epoll_event event;
memset(&event, 0, sizeof(struct epoll_event));
//设置为 读事件触发 以及 边缘触发
event.events = EPOLLIN | EPOLLET;
// [0]为读管道 [1]为写管道
event.data.fd = m_tickleFds[0];
//设置读管道句柄属性 将读fd 设置为非阻塞
ret = fcntl(m_tickleFds[0], F_SETFL, O_NONBLOCK);
if(ret < 0)
{
KIT_LOG_ERROR(g_logger) << "IOManager: fcntl error";
KIT_ASSERT2(false, "fcntl error");
}
//将当前的事件添加到epoll中
ret = epoll_ctl(m_epfd, EPOLL_CTL_ADD, m_tickleFds[0], &event);
if(ret < 0)
{
KIT_LOG_ERROR(g_logger) << "IOManager: epoll_ctl";
KIT_ASSERT2(false, "epoll_ctl error");
}
//默认为个事件信息
contextResize(256);
//启动IO调度器
start();
}
1.2.2 析构函数
/**
* @brief IO调度器类析构函数
*/
~IOManager();
IOManager::~IOManager()
{
//停止调度器
stop();
close(m_epfd); //关闭epoll句柄
close(m_tickleFds[0]); //关闭读管道句柄
close(m_tickleFds[1]); //关闭写管道句柄
//删除事件对象分配的空间
//不使用智能指针的原因:要把空间释放集中到持有调度器的这个线程中
for(size_t i = 0;i < m_fdContexts.size();++i)
{
if(m_fdContexts[i])
delete m_fdContexts[i];
}
}
1.2.3 IOManager大概几个核心的接口
下面几个接口已经实现在基类Scheduler
中,派生类中仍然使用:
start()
开启调度器运作stop()
停止调度器运作run()
这个函数真正执行调度逻辑schduleNoLock()
将任务加进队列
下面几个接口在派生类中IOManager
为核心实现:
tickle()
线程唤醒stopping()
判断调度器是否可以停止idle()
协程没有任务可做时的处理,借助epoll_wait
来唤醒有任务可执行
1). tickle()
功能:通过pipe
管道特性,将管道读端加入到epoll
的管理中。通过往管道写端写入数据(写入什么不重要)就能触发读端。假设现在只有那个管道读IO活跃,退出epoll_wait()
循环监测后,没有其他网络IO需要处理,就会swapOut()
切回到run()
函数中,去查看任务队列的情况,完成一次线程的”唤醒”。
几处用到**tickle()**
地方:
schedule()
添加任务,判断队列在本次添加前是否空:空就tickle()
;不空不唤醒。run()
在轮询任务队列的时候,发现有任务不属于自己执行,并且也没有指定任何线程执行的时候要调用tickle()
去唤醒线程执行这些任务stop()
准备关闭调度器时候,要把所有创建的线程唤醒,去让他们退出:
2). stopping()
功能:和基类几乎一样。多判断一下待处理事件数量m_pendingEventCount
和定时器队列是否空isTimersEmpty()
/**
* @brief 调度器停止的判断条件
* @return true 停止成功
* @return false 停止失败
*/
bool stopping() override;
bool IOManager::stopping()
{
return m_pendingEventCount == 0 &&
isTimersEmpty() && Scheduler::stopping();
}
3). idle()
(核心)
功能:
- 当前协程分配到的任务完成之后,
epoll
里检查一下是否有别的IO活动,调度对应的回调函数/协程 - 当前协程没有任务可做,IO也没有活动的迹象,陷入到
epoll_wait
的循环里面,阻塞等待。
核心逻辑:
while(1){
检查调度器是否已经停止。已经停止就不能往下执行逻辑,需要退出函数。陷入~~epoll_wait~~
,等待IO事件返回,默认超时时间为~~5000ms~~
如果有就绪IO,就带回对应IO:网络IO,依次轮询触发注册好的异步读/写回调函数作为任务加入队列供调度;管道读IO,由~~tickle()~~
函数发送消息导致的IO活动,用于唤醒,将其忽略(不绑定回调函数作为任务去调度仅仅是唤醒作用)。
如果在超时时间内没有就绪IO,停止监测,让出当前协程执行权~~swapOut()~~
。
}
- 加入定时器后,结合
**epoll_wait**
有改动
while(1)
{
- 检查调度器是否已经停止。已经停止就不能往下执行逻辑,需要退出函数。
- 陷入
epoll_wait
,等待IO事件返回,超时时间需要考虑最近快到期的定时任务和默认超时时间取min
选其中的小值。 - 退出
epoll_wait
之后首先检查,是否是因为有到时定时器需要处理。- 有超时定时器,就把所有的超时任务加入调度
- 没有就处理就绪IO
- 如果有就绪IO,就带回对应IO:
- 网络IO,依次轮询触发注册好的异步读/写回调函数作为任务加入队列供调度;
- 管道读IO,由
tickle()
函数发送消息导致的IO活动,用于唤醒,将其忽略(不绑定回调函数作为任务去调度仅仅是唤醒作用)。
- 如果在超时时间内没有就绪IO/到时的定时器,停止监测,让出当前协程执行权
swapOut()
,回到Scheduler::run()
函数中去。
原版
//协程没有任务可做 要执行epoll监听
void IOManager::idle()
{
KIT_LOG_DEBUG(g_logger) << "idle start";
//4个一组 取出已经就绪的IO
struct epoll_event *events = new epoll_event[4];
//小技巧:借助智能指针的指定析构函数 自动释放数组
std::shared_ptr<struct epoll_event> evs_sp(events, [=](struct epoll_event* p){
delete[] events;
});
while(1)
{
/*1.如果调度器关闭了 就退出该函数*/
if(stopping())
{
KIT_LOG_INFO(g_logger) << "iomanager name=" << getName() << " is stopping, idle func exit";
return;
}
/*2.通过epoll_wait 带回已经就绪的IO*/
int n_ready;
do{
//最大超时时间 5000ms
static const int MAXTIMEOUT = 5000;
n_ready = epoll_wait(m_epfd, events, 32, MAXTIMEOUT);
KIT_LOG_DEBUG(g_logger) << "epoll_wait n_ready=" << n_ready;
if(n_ready < 0 && errno == EINTR)
continue; //重新尝试等待wait
else //拿到了返回epoll_event 或者 已经超时就break
break;
}while(1);
/*3.依次处理已经就绪的IO*/
for(int i = 0;i < n_ready;++i)
{
struct epoll_event& event = events[i];
//过滤外部发消息唤醒的IO 跳过
if(event.data.fd == m_tickleFds[0])
{
KIT_LOG_DEBUG(g_logger) << "读管道活跃";
uint8_t temp;
//循环的目的 把缓冲区全部读干净
while(read(m_tickleFds[0], &temp, 1) == 1);
continue;
}
//处理剩下的真正就绪的IO
FdContext* fd_ctx = (FdContext*)event.data.ptr;
FdContext::MutexType::Lock lock(fd_ctx->mutex);
//如果是错误或者中断 导致的活动 重置一下
if(event.events & (EPOLLERR | EPOLLHUP))
event.events |= EPOLLIN | EPOLLOUT;
//开一个变量转换 从epoll_event的事件----->自定义的事件Event
int real_events = NONE;
if(event.events & EPOLLIN)
{
real_events |= READ;
}
if(event.events & EPOLLOUT)
{
real_events |= WRITE;
}
//和当前IO上的事件对比 为0x0没有事件触发 跳过
if((fd_ctx->events & real_events) == Event::NONE)
continue;
//把下面准备主动触发的事件 去除掉 剩余的事件放回epoll中
//因为不知道是单有读 单有写 读写都有 需要涵盖这三种情况
int left_events = (fd_ctx->events & ~real_events);
int op = left_events ? EPOLL_CTL_MOD : EPOLL_CTL_DEL;
//复用event
event.events = EPOLLET | left_events;
int ret = epoll_ctl(m_epfd, op, fd_ctx->fd, &event);
if(ret < 0)
{
KIT_LOG_ERROR(g_logger) << "\nidel: epoll_ctl(" << m_epfd << ", " << op << ", " << fd_ctx->fd << ", " << fd_ctx->events << ");\n"
<< " error:" << ret << "(" << errno << "," << strerror(errno) << ")";
continue;
}
//把剩余没有触发的读事件 主动触发
if(real_events & READ)
{
KIT_LOG_DEBUG(g_logger) << "idle 读事件触发";
fd_ctx->triggerEvent(READ);
--m_pendingEventCount;
}
//把剩余没有触发的写事件 主动触发
if(real_events & WRITE)
{
KIT_LOG_DEBUG(g_logger) << "idle 写事件触发";
fd_ctx->triggerEvent(WRITE);
--m_pendingEventCount;
}
}
/*4.处理完就绪的IO 让出当前协程的执行权 到Scheduler::run中去*/
Coroutine::ptr cur = Coroutine::GetThis();
auto p = cur.get();
cur.reset();
p->swapOut();
}
}
改动(加入Timer)
//协程没有任务可做 要执行epoll监听
void IOManager::idle()
{
KIT_LOG_DEBUG(g_logger) << "idle start";
//4个一组 取出已经就绪的IO
struct epoll_event *events = new epoll_event[4];
//小技巧:借助智能指针的指定析构函数 自动释放数组
std::shared_ptr<struct epoll_event> evs_sp(events, [=](struct epoll_event* p){
delete[] events;
});
while(1)
{
KIT_LOG_DEBUG(g_logger) << "while(1) again";
/*1.如果调度器关闭了 就退出该函数*/
if(stopping())
{
//如果定时器队列为空就可以退出了
if(isTimersEmpty())
{
KIT_LOG_INFO(g_logger) << "iomanager name= " << getName() << " is stopping, idle func exit";
return;
}
}
//获取下一次定时器的执行时间
uint64_t next_timeout = getNextTime();
/*2.通过epoll_wait 带回已经就绪的IO*/
int n_ready;
do{
//最大超时时间 5000ms
static const int MAXTIMEOUT = 1000;
//下一次定时器执行时间
if(next_timeout != ~0ull)
{
next_timeout = next_timeout > MAXTIMEOUT ? MAXTIMEOUT : next_timeout;
}
else
{
next_timeout = MAXTIMEOUT;
}
n_ready = epoll_wait(m_epfd, events, 32, (int)next_timeout);
KIT_LOG_DEBUG(g_logger) << "epoll_wait n_ready=" << n_ready;
if(n_ready < 0 && errno == EINTR)
continue; //重新尝试等待wait
else //拿到了返回epoll_event 或者 已经超时就break
break;
}while(1);
/*3. 检查定时器队列 将所有到时定时器任务进行调度*/
std::vector<std::function<void()> > cbs;
listExpiredCb(cbs);
if(cbs.size())
{
schedule(cbs.begin(), cbs.end());
cbs.clear();
}
/*4.依次处理已经就绪的IO*/
for(int i = 0;i < n_ready;++i)
{
struct epoll_event& event = events[i];
//过滤外部发消息唤醒的IO 跳过
if(event.data.fd == m_tickleFds[0])
{
KIT_LOG_DEBUG(g_logger) << "读管道活跃";
uint8_t temp;
//循环的目的 把缓冲区全部读干净
while(read(m_tickleFds[0], &temp, 1) == 1);
continue;
}
//处理剩下的真正就绪的IO
FdContext* fd_ctx = (FdContext*)event.data.ptr;
FdContext::MutexType::Lock lock(fd_ctx->mutex);
//如果是错误或者中断 导致的活动 重置一下
if(event.events & (EPOLLERR | EPOLLHUP))
event.events |= EPOLLIN | EPOLLOUT;
//开一个变量转换 从epoll_event的事件----->自定义的事件Event
int real_events = 0;
if(event.events & EPOLLIN)
{
real_events |= READ;
}
if(event.events & EPOLLOUT)
{
real_events |= WRITE;
}
//和当前IO上的事件对比 不符合就说明没有事件触发 跳过
if((fd_ctx->events & real_events) == Event::NONE)
continue;
//把下面准备主动触发的事件 去除掉 剩余的事件放回epoll中
//BUG点:自定义的Event 和
int left_events = (fd_ctx->events & ~real_events);
int op = left_events ? EPOLL_CTL_MOD : EPOLL_CTL_DEL;
//复用event
event.events = EPOLLET | left_events;
int ret = epoll_ctl(m_epfd, op, fd_ctx->fd, &event);
if(ret < 0)
{
KIT_LOG_ERROR(g_logger) << "\nidel: epoll_ctl(" << m_epfd << ", " << op << ", " << fd_ctx->fd << ", " << fd_ctx->events << ");\n"
<< " error:" << ret << "(" << errno << "," << strerror(errno) << ")";
continue;
}
//把需要触发的读事件 主动触发
if(real_events & READ)
{
KIT_LOG_DEBUG(g_logger) << "idle 读事件触发";
fd_ctx->triggerEvent(READ);
--m_pendingEventCount;
}
//把需要触发的写事件 主动触发
if(real_events & WRITE)
{
KIT_LOG_DEBUG(g_logger) << "idle 写事件触发";
fd_ctx->triggerEvent(WRITE);
--m_pendingEventCount;
}
}
/*4.处理完就绪的IO 让出当前协程的执行权 到Scheduler::run中去*/
Coroutine::ptr cur = Coroutine::GetThis();
auto p = cur.get();
cur.reset();
//从这切回 又会 从这切进
p->swapOut();
}
}
4). onTimerInsertedAtFront()
功能:定时器队列队头插入对象后进行epoll_wait超时更新
/**
* @brief 定时器队列队头插入对象后进行epoll_wait超时更新
*/
void onTimerInsertedAtFront() override;
void IOManager::onTimerInsertedAtFront()
{
//唤醒一下 在epoll_wait的线程
tickle();
}
1.2.4 addEvent()
功能:为句柄添加/修改事件,创建对应的事件对象
/**
* @brief 为句柄添加事件 0成功 -1出错
* @param[in] fd 给哪一个句柄fd添加
* @param[in] event 读/写事件
* @param[in] cb 要执行的回调函数
* @return
* @retval 0 添加成功
* @retval -1 添加失败
*
*/
int addEvent(int fd, Event event, std::function<void()> cb = nullptr);
int IOManager::addEvent(int fd, Event event, std::function<void()> cb)
{
FdContext *fd_ctx = nullptr;
/*拿到对应的 句柄对象 没有就创建*/
//给句柄队列加读锁
MutexType::ReadLock lock(m_mutex);
if((int)m_fdContexts.size() > fd)
{
//KIT_LOG_DEBUG(g_logger) << "存在并取出 fd = " << fd;
fd_ctx = m_fdContexts[fd];
lock.unlock(); //解读锁
}
else //事件对象扩容
{
//解读锁
lock.unlock();
//给句柄队列加写锁
MutexType::WriteLock lock2(m_mutex);
KIT_LOG_DEBUG(g_logger) << "fd不存在并扩容,fd=" << fd;
contextResize(fd * 1.5);
fd_ctx = m_fdContexts[fd];
}
//给句柄资源加互斥锁
FdContext::MutexType::Lock _lock(fd_ctx->mutex);
/*设置句柄对象的信息*/
//同一个句柄上面不能加加相同的事件
//如果有这种情况出现 说明有多个线程在操作同一个句柄
if(KIT_UNLIKELY(fd_ctx->events & event))
{
KIT_LOG_ERROR(g_logger) << "addEvent: event exists, fd= " << fd
<< ", event=" << event
<< ";exist event=" << fd_ctx->events;
KIT_ASSERT(!(fd_ctx->events & event));
}
//判断本次事件修改还是新增
int op = fd_ctx->events ? EPOLL_CTL_MOD : EPOLL_CTL_ADD;
struct epoll_event ev;
ev.events = EPOLLET | fd_ctx->events | event;
ev.data.ptr = fd_ctx;
//将事件添加/修改到epoll
int ret = epoll_ctl(m_epfd, op, fd, &ev);
if(ret < 0)
{
KIT_LOG_ERROR(g_logger) << "\naddEvent: epoll_ctl(" << m_epfd << ", " << op << ", " << fd << ", " << ev.events << ");\n"
<< " error:" << ret << "(" << errno << "," << strerror(errno) << ")";
return -1;
}
//待处理事件自增
++m_pendingEventCount;
//将句柄上的事件叠加
fd_ctx->events = (Event)(fd_ctx->events | event);
//构建对应的添加的读/写 事件对象 设置相关信息
//要加 读事件就返回 read_event;要加写事件 就返回write_event
FdContext::EventContext& event_contex = fd_ctx->getEventContext(event);
KIT_ASSERT(!event_contex.scheduler && !event_contex.coroutine && !event_contex.cb);
//设置调度器
event_contex.scheduler = Scheduler::GetThis();
//设置回调函数
if(cb)
event_contex.cb.swap(cb);
else //没有设置回调 下一次就继续执行当前协程
{
event_contex.coroutine = Coroutine::GetThis();
//给事件对象绑定协程时候 协程应该是运行的
KIT_ASSERT2(event_contex.coroutine->getState() == Coroutine::State::EXEC, "thread id=" << GetThreadId() << ",coroutine id=" << event_contex.coroutine->getID() << ",state=" << event_contex.coroutine->getState());
}
return 0;
}
1.2.5 delEvent()
功能:为句柄删除单个事件
/**
* @brief 为句柄删除单个事件
* @param[in] fd 给哪一个句柄fd删除
* @param[in] event 读/写事件
* @return true 删除成功
* @return false 删除失败
*/
bool delEvent(int fd, Event event);
bool IOManager::delEvent(int fd, Event event)
{
MutexType::ReadLock lock(m_mutex);
//句柄对象不存在不用删除
if((int)m_fdContexts.size() <= fd)
{
return false;
}
FdContext* fd_ctx = m_fdContexts[fd];
lock.unlock();
//给句柄资源加互斥锁
FdContext::MutexType::Lock lock2(fd_ctx->mutex);
//该句柄上没有对应事件 不用删除
if(!(fd_ctx->events & event))
{
return false;
}
//取反运算+与运算 就是去掉该事件event
Event left_events = (Event)(fd_ctx->events & ~event);
//去掉之后看句柄上还是否有剩余的事件 有就修改epoll 没有了就从epoll删除
int op = left_events ? EPOLL_CTL_MOD : EPOLL_CTL_DEL;
//构造epoll_event事件
struct epoll_event ev;
ev.events = EPOLLET | left_events;
ev.data.ptr = fd_ctx;
//将事件添加/修改到epoll
int ret = epoll_ctl(m_epfd, op, fd, &ev);
if(ret < 0)
{
KIT_LOG_ERROR(g_logger) << "\ndelEvent: epoll_ctl(" << m_epfd << ", " << op << ", " << fd << ", " << ev.events << ");\n"
<< " error:" << ret << "(" << errno << "," << strerror(errno) << ")";
return false;
}
//更新句柄上的事件
fd_ctx->events = left_events;
//待处理事件对象自减
--m_pendingEventCount;
//把fdContext句柄对象中对应的读/写事件对象EventContext拿出来清空
FdContext::EventContext& event_context = fd_ctx->getEventContext(event);
fd_ctx->resetEventContext(event_context);
return true;
}
1.2.6 cancelEvent()
功能:为句柄取消单个事件,找到对应事件强制触发执行,不等待句柄上的事件触发
/**
* @brief 为句柄取消单个事件,找到对应事件强制触发执行,不等待条件满足
* @param[in] fd 给哪一个句柄fd取消
* @param[in] event 读/写事件
* @return true 取消并触发成功
* @return false 取消并触发失败
*/
bool cancelEvent(int fd, Event event);
bool IOManager::cancelEvent(int fd, Event event)
{
MutexType::ReadLock lock(m_mutex);
//句柄不存在不用触发
if((int)m_fdContexts.size() <= fd)
{
return false;
}
FdContext* fd_ctx = m_fdContexts[fd];
lock.unlock();
FdContext::MutexType::Lock lock2(fd_ctx->mutex);
//该句柄上没有对应事件 不用触发
if(!(fd_ctx->events & event))
{
return false;
}
//取反运算 + 与运算 就是去掉该事件
Event left_events = (Event)(fd_ctx->events & ~event);
//去掉之后看句柄上还是否有剩余的事件 有就修改epoll 没有了就从epoll删除
int op = left_events ? EPOLL_CTL_MOD : EPOLL_CTL_DEL;
//构造epoll_event事件
struct epoll_event ev;
ev.events = EPOLLET | left_events;
ev.data.ptr = fd_ctx;
//将事件添加/修改到epoll
int ret = epoll_ctl(m_epfd, op, fd, &ev);
if(ret < 0)
{
KIT_LOG_ERROR(g_logger) << "\ndelEvent: epoll_ctl(" << m_epfd << ", " << op << ", " << fd << ", " << ev.events << ");\n"
<< " error:" << ret << "(" << errno << "," << strerror(errno) << ")";
return false;
}
//主动触发句柄上事件绑定的回调函数 重新加入到任务队列里
fd_ctx->triggerEvent(event);
//待处理事件对象自减
--m_pendingEventCount;
return true;
}
1.2.7 cancelAll()
功能:为句柄取消所有事件,所有事件强制触发执行,不等待条件满足
/**
* @brief 为句柄取消所有事件,所有事件强制触发执行,不等待条件满足
* @param[in] fd 给哪一个句柄fd取消
* @return true 取消并触发成功
* @return false 取消并触发失败
*/
bool cancelAll(int fd);
bool IOManager::cancelAll(int fd)
{
MutexType::ReadLock lock(m_mutex);
//句柄不存在不用删除
if((int)m_fdContexts.size() <= fd)
{
return false;
}
FdContext* fd_ctx = m_fdContexts[fd];
lock.unlock();
FdContext::MutexType::Lock lock2(fd_ctx->mutex);
//该句柄上没有对应事件 不用删除
if(!fd_ctx->events)
{
return false;
}
//直接从epoll里移除该事件
int op = EPOLL_CTL_DEL;
//构造epoll_event事件
struct epoll_event ev;
ev.events = 0;
ev.data.ptr = fd_ctx;
//将事件删除到epoll
int ret = epoll_ctl(m_epfd, op, fd, &ev);
if(ret < 0)
{
KIT_LOG_ERROR(g_logger) << "\ndelEvent: epoll_ctl(" << m_epfd << ", " << op << ", " << fd << ", " << ev.events << ");\n"
<< " error:" << ret << "(" << errno << "," << strerror(errno) << ")";
return false;
}
if(fd_ctx->events & READ)
{
//事件对象 主动触发读事件对象上的回调
fd_ctx->triggerEvent(READ);
//待处理事件对象自减
--m_pendingEventCount;
}
if(fd_ctx->events & WRITE)
{
//事件对象 主动触发写事件对象上的回调
fd_ctx->triggerEvent(WRITE);
//待处理事件对象自减
--m_pendingEventCount;
}
//句柄对象上的注册事件应该为NONE = 0
KIT_ASSERT(fd_ctx->events == 0);
return true;
}
1.2.8 contextResize()
功能:保护类型接口,修改句柄对象数量
/**
* @brief 修改句柄对象数量
* @param[in] size 要修改到的数量
*/
void contextResize(size_t size);
void IOManager::contextResize(size_t size)
{
m_fdContexts.resize(size);
for(size_t i = 0;i < m_fdContexts.size();++i)
{
if(!m_fdContexts[i])
{
m_fdContexts[i] = new FdContext;
m_fdContexts[i]->fd = i;
}
}
}
2. 定时器Timer封装
功能:在间隔一个指定时间后,会自动的往任务队列中添加一个函数/协程,执行我们需要的操作。支持一次性定时器触发和循环定时器触发
使用场景:在特定时间段或者某一段时间内需要执行一些任务等场景
2.1 Timer类
Timer
类不能直接创建其对象,构造函数置为私有属性,将其对象创建交由TimerManager
类负责管理
Timer
和TimerManager
互相置为友元类。方便调用对方的接口以及访问对方的成员变量(主要为了能够调用Timer
的构造函数,不能显式调用);但是造成两个类的封装性被破坏,两个类的耦合度较高
class Timer
{
...
...
private:
//Timer的构造函数为私有意味着不能显示创建对象 必须由TimerManager来创建
Timer(uint64_t ms, std::function<void()> cb, bool recurring, TimerManager *manager);
Timer(uint64_t next);
...
...
}
2.1.1 成员变量
class Timer
{
...
...
private:
/// 定时器间隔时长
uint64_t m_ms = 0;
/// 执行的回调函数
std::function<void()> m_cb;
/// 当前定时器是否是循环定时器
bool m_recurring = false;
/// 精确的定时器超时时间点
uint64_t m_next = 0;
/// 定时器管理类指针
TimerManager* m_manager = nullptr;
};
2.1.2 接口
2.1.2.1 构造函数(私有)
功能:意味着不能显式创建对象 必须由TimerManager
来创建
/**
* @brief 定时器类构造函数 不能显式创建对象
* @param[in] ms 间隔时长
* @param[in] cb 执行的回调函数
* @param[in] recurring 是否是循环定时器
* @param[in] manager 哪一个定时器管理类
*/
Timer(uint64_t ms, std::function<void()> cb, bool recurring, TimerManager *manager);
Timer::Timer(uint64_t ms, std::function<void()> cb, bool recurring, TimerManager *manager)
:m_ms(ms), m_cb(cb), m_recurring(recurring), m_manager(manager)
{
//计算到期时间点
m_next = GetCurrentMs() + m_ms;
}
/**
* @brief 定时器类构造函数 不能显式创建对象
* @param[in] next 到期时间点
*/
Timer(uint64_t next);
Timer::Timer(uint64_t next)
:m_next(next)
{
}
2.1.2.1 cancel()
(核心)
功能:取消当前定时器的定时任务。
通过操作TimerManger
的定时器队列,通过比较定时器的内存地址以及到期时间点,找到对应的定时器对象,从定时器队列中移除。
/**
* @brief 取消定时器
* @return true 取消成功
* @return false 取消失败
*/
bool cancel();
bool Timer::cancel()
{
TimerManager::MutexType::WriteLock lock(m_manager->m_mutex);
if(m_cb)
{
m_cb = nullptr;
auto it = m_manager->m_timers.find(shared_from_this());
if(it == m_manager->m_timers.end())
{
KIT_LOG_ERROR(g_logger) << "Timer::cancel get timer error";
return false;
}
m_manager->m_timers.erase(it);
return true;
}
return false;
}
2.1.2.2 refresh()
功能:重新刷新定时器的间隔时间,让其重新开始新的计时间隔,但不改变原来设定好的时间间隔量。
注意:由于使用的容器set<Timer::ptr>
底层是红黑树,是一个有序结构,必须先将原来的定时器从这个有序序列中移除,再重新加入才能合法,否则直接原地修改会打乱有序规则,发生未知错误。
/**
* @brief 刷新定时器时间
* @return true 刷新成功
* @return false 刷新失败
*/
bool refresh();
bool Timer::refresh()
{
TimerManager::MutexType::WriteLock lock(m_manager->m_mutex);
if(!m_cb)
return false;
auto it = m_manager->m_timers.find(shared_from_this());
if(it == m_manager->m_timers.end())
{
return false;
}
//注意:由于使用的set 要先移除再进行重置加入 不能直接在原来的定时器上直接修改时间
m_manager->m_timers.erase(it);
m_next = GetCurrentMs() + m_ms;
m_manager->m_timers.insert(shared_from_this());
return true;
}
2.1.2.3 reset()
(核心)
功能:重新设定当前定时器的时间间隔量。并且还要判断是从即刻生效还是继续上一次旧的间隔量基础上继续等待触发。默认情况下让其从此刻重新开始计时触发。
注意:重新设置时间间隔量,可能存在比原来的间隔量大的可能性,也存在比原来的间隔量小的可能性,都需要考虑处理。
- 核心逻辑:
- 判断重新设定的时间间隔是否和原来相等,且重新设定是否从即刻生效。(如果和原来相等,并且即刻生效,其功能就会变成
refresh()
。) - 获取
TimerManager
的读写锁资源RWMutex
,加上写锁 - 从
TimerManaer
的定时器队列里找出对应的定时器移除,并且重新计算定时器的到期时间点:- 如果
from_now = true
,即:即刻生效。获取当前时间GetCurrentMs()
,将其作为新的时间起点,然后又加上新的时间间隔量。 - 如果
from_now = false
,即:不需要即刻生效。计算得到原来的时间起点原时间起点 = 原触发时间点 - 原时间间隔
,在原时间起点的基础上加上新的时间间隔来完成触发。也就是说这种重置定时器的做法,设置完之后的定时器已经过了一部分时间,计划等待时间 >= 实际等待时间
- 如果
- 写锁解锁
- 调用
TimerManager::addTimer()
将定时器重新加入队列,不直接使用set.insert()
的原因在于:当前定时器的到期时间点 < 队头定时器(队列到期时间点最近的定时器)到期时间点
,在加入队列时存在这种可能,需要tickle()
去唤醒epoll_wait
去修改其超时时间。
/**
* @brief 重新设定定时器间隔时间
* @param[in] ms 间隔时间
* @param[in] from_now 是否从当前时间点开始重新计时 默认是
* @return true 重置成功
* @return false 重置失败
*/
bool reset(uint64_t ms, bool from_now = true);
bool Timer::reset(uint64_t ms, bool from_now)
{
if(m_ms == ms && !from_now)
return true;
TimerManager::MutexType::WriteLock lock(m_manager->m_mutex);
if(!m_cb) //没有任务就要返回
return false;
auto it = m_manager->m_timers.find(shared_from_this());
if(it == m_manager->m_timers.end())
{
return false;
}
//注意:由于使用的set 要先移除再进行重置加入
m_manager->m_timers.erase(it);
uint64_t start = 0;
//重新从现在开始计时
if(from_now)
start = GetCurrentMs();
else //继续上一次的计时
start = m_next - m_ms;
//设置新的计时间隔
m_ms = ms;
//设置新的触发时间点
m_next = start + m_ms;
//写锁解锁
lock.unlock();
//因为reset可能出现执行时间变成最小的可能(放在队头) 会有一次唤醒
m_manager->addTimer(shared_from_this());
return true;
}
2.2 TimerManager类
功能:管理Timer
定时器队列,让其能够有序的执行到期后的定时任务。
置为一个虚基类,让其继承类通过纯虚函数virtual void onTimerInsertedAtFront() = 0
去加入相关的其他业务拓展,TimerManger
虚基类的基本功能就是负责管理一个定时器队列。
小技巧:Timer
内部构建仿函数作为set
容器比较的函数调用
目的:由于原本的set
容器根据存储对象类型,默认比较存储类型的值。(存int比较int,存指针比较指针)。但是存储定时器队列我们不希望是通过比较指针值来排列,我们希望通过比较到期时间来进行排列,故需要重置比较函数。
class Timer
{
....
private:
/**
* @brief 构建仿函数 用于set比较
*/
struct Comparator
{
bool operator()(const Timer::ptr& lhs, const Timer::ptr& rhs) const;
};
...
};
bool Timer::Comparator::operator()(const Timer::ptr& lhs, const Timer::ptr& rhs) const
{
/*判断地址*/
if(!lhs && !rhs)
return false;
if(!lhs)
return true;
if(!rhs)
return false;
/*都有值 判断到期时间*/
if(lhs->m_next < rhs->m_next)
return true;
if(lhs->m_next > rhs->m_next)
return false;
//到期时间相等的话 地址小的优先调度
return lhs.get() < rhs.get();
}
2.2.1 成员变量
class TimerManager
{
...
private:
//读写锁
MutexType m_mutex;
//定时器队列
std::set<Timer::ptr, Timer::Comparator> m_timers;
//避免频繁修改的一个ticked标记
bool m_tickled = false;
//旧的服务器的时间
uint64_t m_previousTime = 0;
};
2.2.2 接口
2.2.2.1 构造函数
/**
* @brief 定时器管理类构造函数
*/
TimerManager();
TimerManager::TimerManager()
{
//初始化时候 获取一次服务器时间
m_previousTime = GetCurrentMs();
}
2.2.2.2 addTimer()
功能:通过传参,创建并往定时器队列中添加一个定时器
注意:在这个调用接口中创建定时器,又因为Timer
构造函数已经置为私有属性,必须将TimerManager
置为Timer
的友元类方可调用。
//创建并添加定时器
Timer::ptr TimerManager::addTimer(uint64_t ms, std::function<void()> cb, bool recurring)
{
//在TimerManager 中构造Timer
Timer::ptr timer(new Timer(ms, cb, recurring, this));
addTimer(timer);
return timer;
}
如果不考虑外部访问的安全性,在构造函数私有情况下,也有不需要置为友元类也能创建对象的方法,但是该方法没有到达我们想要杜绝单独创建一个
Timer对象
的目的: ```cpp class Timer { public:static Timer::ptr getNewTimer() {
Timer::ptr timer(new Timer(ms, cb, recurring, IOMager::GetThis()));
...
reutrn timer;
}
private:
//Timer的构造函数为私有意味着不能显示创建对象 必须由TimerManager来创建
Timer(uint64_t ms, std::function
Timer(uint64_t next);
...
};
<a name="S5mMu"></a>
#### 2.2.2.3 `addTimer(Timer::ptr p)` (核心)
功能:通过传入定时器,往定时器队列中添加一个定时器
- **核心逻辑:**
1. 加写锁
1. 调用`set.insert()`插入定时器队列,并获取插入后的位置
1. 如果此时定时器被插入到队头,且唤醒标志`m_tickled == false`,认为此时队头的最近到期时间点发生改变`is_front = true`,需要去唤醒线程修改对应的`epoll_wait`的超时时间
1. 写锁解锁
1. 根据插入情况,选择唤醒
```cpp
/**
* @brief 以智能指针形式 添加条件定时器
* @param[in] p 定时器智能指针
*/
void addTimer(Timer::ptr p);
void TimerManager::addTimer(Timer::ptr p)
{
MutexType::WriteLock lock(m_mutex);
//insert返回两个值 插入后的迭代器位置/插入是否成功
//判断插入后是否是在队头 最小的到时时间
auto it = m_timers.insert(p).first;
bool is_front = (it == m_timers.begin()) && !m_tickled;
//频繁修改时候 避免总是去唤醒
if(is_front)
m_tickled = true;
lock.unlock();
//当有比之前更小的 定时器任务插入 就要通知epoll_wait那边去修改为更小的等待时间
//实际上:将挂起的协程唤醒,重新走一遍流程idle()---->run()--->idle()
if(is_front)
onTimerInsertedAtFront();
}
2.2.2.4 addConditionTimer()
功能:添加一个条件定时器,和普通定时器相比,多了一个用weak_ptr<>
弱指针指向shared_ptr<>
共享指针管理的”条件”,定时器触发时检查”条件”是否还存在,不存在将不执行所持有的任务
/**
* @brief 添加条件定时器辅助函数
* @param[in] weak_cond 判断条件是否存在的弱指针
* @param[in] cb 触发的回调函数
*/
static void OnTimer(std::weak_ptr<void> weak_cond, std::function<void()> cb)
{
//利用弱指针weak_ptr 来判断条件是否存在
std::shared_ptr<void> temp = weak_cond.lock();
if(temp)
{
cb();
}
}
/**
* @brief 添加条件定时器
* @param[in] ms 间隔时长
* @param[in] cb 执行的回调函数
* @param[in] weak_cond 判断条件是否还存在的弱指针
* @param[in] recurring 是否是循环定时器 默认不是
* @return Timer::ptr 添加完成后会将创建的定时器返回
*/
Timer::ptr addConditionTimer(uint64_t ms, std::function<void()> cb, std::weak_ptr<void> weak_cond, bool recurring = false);
Timer::ptr TimerManager::addConditionTimer(uint64_t ms, std::function<void()> cb, std::weak_ptr<void> weak_cond, bool recurring)
{
return addTimer(ms, std::bind(&OnTimer, weak_cond, cb), recurring);
}
2.2.2.5 getNextTime()
(核心)
功能:获取当前定时器队列队头的到期时间点的值。如果已经超出了触发时间点,返回0
/**
* @brief 获取队头定时器的到期时间点
* @return uint64_t
*/
uint64_t getNextTime();
uint64_t TimerManager::getNextTime()
{
MutexType::ReadLock lock(m_mutex);
//获取队头定时器时间一次 就可以去唤醒一次
m_tickled = false;
//定时器队列为空返回一个极大值
if(!m_timers.size())
return ~0ull;
const Timer::ptr& next = *m_timers.begin();
uint64_t now_ms = GetCurrentMs();
if(now_ms >= next->m_next) //现在获取的时间 已经晚于预计要触发的时间点 马上执行
return 0;
else //还没到预定时间就返回剩余时间间隔
return next->m_next - now_ms;
}
2.2.2.6 listExpiredCb()
(核心)
功能:检查当前的定时队列,找出所有已经到时需要触发的定时器,从定时器队列中移除,并且将这些已经要触发的定时器中的函数/协程全部取出放入一个目标容器内。
注意:处理服务器时间被修改的情况,会对定时器触发有一定的影响
**checkClockChange()**
(私有)
功能:探测当前服务器时间是否被往前调小了。被调小的判定:当前时间 < 上一次记录的时间 && 当前时间 < 上一次记录的时间的一个小时前的时间
, 重点检查时间被调小的情况因为可能会导致定时器队列里所有任务都触发。
/**
* @brief 探测当前服务器时间是否改变
* @param[in] now_ms 当前的时间点
* @return true 已经被修改
* @return false 没有被修改
*/
bool checkClockChange(uint64_t now_ms);
bool TimerManager::checkClockChange(uint64_t now_ms)
{
bool changed = false;
//当前时间小于上一次时间且比上一次时间一小时前还要小 就认为服务器时间被修改
if(now_ms < m_previousTime && now_ms < (m_previousTime - 60 * 60 * 1000))
changed = true;
m_previousTime = now_ms;
return changed;
}
/**
* @brief 找出已经到期的定时任务集合
* @param[out] cbs 存储全部已经到期的任务
*/
void listExpiredCb(std::vector<std::function<void()> >& cbs);
void TimerManager::listExpiredCb(std::vector<std::function<void()> >& cbs)
{
uint64_t now_ms = GetCurrentMs();
std::vector<std::shared_ptr<Timer> > expired;
{
MutexType::ReadLock lock(m_mutex);
if(!m_timers.size())
return;
}
MutexType::WriteLock lock(m_mutex);
if(!m_timers.size())
return;
bool changed = checkClockChange(now_ms);
//服务器时间没有发生改变 且不存在超时定时器 就退出函数
if(!changed && (*m_timers.begin())->m_next < now_ms)
return;
//构造一个装有当前时间的Timer为了应用lower_bound算法函数
Timer::ptr now_timer(new Timer(now_ms));
//如果服务器时间发生了变动 就返回end() 会将整个m_timers全部清理 重新加入队列
//否则 将找出大于等于当前时间的第一个Timer
auto it = changed ? m_timers.end() : m_timers.lower_bound(now_timer);
//找到还没到时的第一个定时器的位置后 停下
while(it != m_timers.end() && (*it)->m_next == now_ms)
++it;
//将队头~当前位置之前 所有已经到时的定时器全部拿出 到expired队列中
expired.insert(expired.begin(), m_timers.begin(), it);
//将原队列前部到期的定时器全部移除
m_timers.erase(m_timers.begin(), it);
//扩充可执行任务队列空间
cbs.reserve(expired.size());
for(auto &x : expired)
{
cbs.emplace_back(x->m_cb);
//循环定时器就放回 set中
if(x->m_recurring)
{
x->m_next = now_ms + x->m_ms;
m_timers.insert(x);
}
else
x->m_cb = nullptr;
}
}
C\C++知识点补充复习:fcntl函数
出处:《UNIX环境高级编程 第3版》P66
功能:修改已经打开文件的各种属性
#include <unistd.h>
#include <fcntl.h>
int fcntl(int fd, int cmd, ... /* arg */ );
具体功能:
- 复制已有的文件描述符
F_DUPFD
或F_DUPFD_CLOEXEC
- 获取/设置文件描述符标志
F_GETFD
和F_SETFD
- 获取/设置文件描述符状态
F_GETFL
和F_SETFL
- 获取/设置异步IO所有权
F_GETTOWN
和F_SETTOWN
- 获取/设置记录数
F_GETLK
和F_SETLK
、F_SETLKW
- 文件状态标志位:
C\C++知识点补充复习:算法lower_bound函数
头文件<algorithm>
功能:对于有序容器,返回一个迭代器,指向大于等于key的第一个元素。
触类旁通:upper_bound函数返回的迭代器,指向小于等于key的第一个元素