前言
webrtc下事件的发生
等待线程处理逻辑
内层循环,如果没有消息,则Wait,如果有则获取第一个,返回处理。
PhysicalSocketServer的Wait函数
源码分析
网络线程,PhysicalSocketServer,和事件
工作线程,使用的是NullSocketServer
H:\webrtc-20210315\webrtc-20210315\webrtc\webrtc-checkout\src\rtc_base\physical_socket_server.cc
PhysicalSocketServer::Wait
bool PhysicalSocketServer::Wait(int cmsWait, bool process_io) {
// We don't support reentrant waiting.
RTC_DCHECK(!waiting_);
ScopedSetTrue s(&waiting_);
int64_t cmsTotal = cmsWait;
int64_t cmsElapsed = 0;
int64_t msStart = Time();
fWait_ = true;
while (fWait_) {
std::vector<WSAEVENT> events;
std::vector<uint64_t> event_owners;
// 将要监听的事件添加到数组中
events.push_back(socket_ev_);
{
CritScope cr(&crit_);
// Get a snapshot of all current dispatchers; this is used to avoid the
// ABA problem (see later comment) and avoids the dispatcher_by_key_
// iterator being invalidated by calling CheckSignalClose, which may
// remove the dispatcher from the list.
current_dispatcher_keys_.clear();
for (auto const& kv : dispatcher_by_key_) {
current_dispatcher_keys_.push_back(kv.first);
}
for (uint64_t key : current_dispatcher_keys_) {
if (!dispatcher_by_key_.count(key)) {
continue;
}
Dispatcher* disp = dispatcher_by_key_.at(key);
if (!disp)
continue;
if (!process_io && (disp != signal_wakeup_))
continue;
// 获取socket
SOCKET s = disp->GetSocket();
if (disp->CheckSignalClose()) {
// We just signalled close, don't poll this socket.
} else if (s != INVALID_SOCKET) { // 将socket和事件绑定,当事件发生时可以知道哪个socket触发的事件
WSAEventSelect(s, events[0],
FlagsToEvents(disp->GetRequestedEvents()));
} else { // 普通事件的socket
events.push_back(disp->GetWSAEvent());
event_owners.push_back(key);
}
}
}
// Which is shorter, the delay wait or the asked wait?
int64_t cmsNext;
if (cmsWait == kForever) {
cmsNext = cmsWait;
} else {
cmsNext = std::max<int64_t>(0, cmsTotal - cmsElapsed);
}
// Wait for one of the events to signal
// 开始事件等待
DWORD dw =
WSAWaitForMultipleEvents(static_cast<DWORD>(events.size()), &events[0],
false, static_cast<DWORD>(cmsNext), false);
// 事件发生后,开始判断并处理
if (dw == WSA_WAIT_FAILED) {
// Failed?
// TODO(pthatcher): need a better strategy than this!
WSAGetLastError();
RTC_NOTREACHED();
return false;
} else if (dw == WSA_WAIT_TIMEOUT) {
// Timeout?
return true;
} else {
// Figure out which one it is and call it
CritScope cr(&crit_);
int index = dw - WSA_WAIT_EVENT_0;
if (index > 0) {
--index; // The first event is the socket event
uint64_t key = event_owners[index];
if (!dispatcher_by_key_.count(key)) {
// The dispatcher could have been removed while waiting for events.
continue;
}
Dispatcher* disp = dispatcher_by_key_.at(key);
disp->OnEvent(0, 0);
} else if (process_io) {
// Iterate only on the dispatchers whose sockets were passed into
// WSAEventSelect; this avoids the ABA problem (a socket being
// destroyed and a new one created with the same SOCKET handle).
for (uint64_t key : current_dispatcher_keys_) {
if (!dispatcher_by_key_.count(key)) {
continue;
}
Dispatcher* disp = dispatcher_by_key_.at(key);
SOCKET s = disp->GetSocket();
if (s == INVALID_SOCKET)
continue;
WSANETWORKEVENTS wsaEvents;
int err = WSAEnumNetworkEvents(s, events[0], &wsaEvents);
if (err == 0) {
{
if ((wsaEvents.lNetworkEvents & FD_READ) &&
wsaEvents.iErrorCode[FD_READ_BIT] != 0) {
RTC_LOG(WARNING)
<< "PhysicalSocketServer got FD_READ_BIT error "
<< wsaEvents.iErrorCode[FD_READ_BIT];
}
if ((wsaEvents.lNetworkEvents & FD_WRITE) &&
wsaEvents.iErrorCode[FD_WRITE_BIT] != 0) {
RTC_LOG(WARNING)
<< "PhysicalSocketServer got FD_WRITE_BIT error "
<< wsaEvents.iErrorCode[FD_WRITE_BIT];
}
if ((wsaEvents.lNetworkEvents & FD_CONNECT) &&
wsaEvents.iErrorCode[FD_CONNECT_BIT] != 0) {
RTC_LOG(WARNING)
<< "PhysicalSocketServer got FD_CONNECT_BIT error "
<< wsaEvents.iErrorCode[FD_CONNECT_BIT];
}
if ((wsaEvents.lNetworkEvents & FD_ACCEPT) &&
wsaEvents.iErrorCode[FD_ACCEPT_BIT] != 0) {
RTC_LOG(WARNING)
<< "PhysicalSocketServer got FD_ACCEPT_BIT error "
<< wsaEvents.iErrorCode[FD_ACCEPT_BIT];
}
if ((wsaEvents.lNetworkEvents & FD_CLOSE) &&
wsaEvents.iErrorCode[FD_CLOSE_BIT] != 0) {
RTC_LOG(WARNING)
<< "PhysicalSocketServer got FD_CLOSE_BIT error "
<< wsaEvents.iErrorCode[FD_CLOSE_BIT];
}
}
uint32_t ff = 0;
int errcode = 0;
if (wsaEvents.lNetworkEvents & FD_READ)
ff |= DE_READ;
if (wsaEvents.lNetworkEvents & FD_WRITE)
ff |= DE_WRITE;
if (wsaEvents.lNetworkEvents & FD_CONNECT) {
if (wsaEvents.iErrorCode[FD_CONNECT_BIT] == 0) {
ff |= DE_CONNECT;
} else {
ff |= DE_CLOSE;
errcode = wsaEvents.iErrorCode[FD_CONNECT_BIT];
}
}
if (wsaEvents.lNetworkEvents & FD_ACCEPT)
ff |= DE_ACCEPT;
if (wsaEvents.lNetworkEvents & FD_CLOSE) {
ff |= DE_CLOSE;
errcode = wsaEvents.iErrorCode[FD_CLOSE_BIT];
}
if (ff != 0) {
disp->OnEvent(ff, errcode);
}
}
}
}
// Reset the network event until new activity occurs
WSAResetEvent(socket_ev_);
}
// Break?
if (!fWait_)
break;
cmsElapsed = TimeSince(msStart);
if ((cmsWait != kForever) && (cmsElapsed >= cmsWait)) {
break;
}
}
// Done
return true;
}
触发事件
void Thread::WakeUpSocketServer() {
ss_->WakeUp();
}
向网络线程发送任务
这个事件,就是由Thread::WakeUpSocketServer触发的。
void Thread::WakeUpSocketServer() {
ss_->WakeUp();
}
-》
void PhysicalSocketServer::WakeUp() {
signal_wakeup_->Signal();
}
-》
virtual void Signal() {
if (hev_ != nullptr)
WSASetEvent(hev_);
}
事件触发后,又来执行