前言
webrtc下事件的发生
等待线程处理逻辑


内层循环,如果没有消息,则Wait,如果有则获取第一个,返回处理。
PhysicalSocketServer的Wait函数

源码分析
网络线程,PhysicalSocketServer,和事件
工作线程,使用的是NullSocketServer
H:\webrtc-20210315\webrtc-20210315\webrtc\webrtc-checkout\src\rtc_base\physical_socket_server.cc
PhysicalSocketServer::Wait
bool PhysicalSocketServer::Wait(int cmsWait, bool process_io) {// We don't support reentrant waiting.RTC_DCHECK(!waiting_);ScopedSetTrue s(&waiting_);int64_t cmsTotal = cmsWait;int64_t cmsElapsed = 0;int64_t msStart = Time();fWait_ = true;while (fWait_) {std::vector<WSAEVENT> events;std::vector<uint64_t> event_owners;// 将要监听的事件添加到数组中events.push_back(socket_ev_);{CritScope cr(&crit_);// Get a snapshot of all current dispatchers; this is used to avoid the// ABA problem (see later comment) and avoids the dispatcher_by_key_// iterator being invalidated by calling CheckSignalClose, which may// remove the dispatcher from the list.current_dispatcher_keys_.clear();for (auto const& kv : dispatcher_by_key_) {current_dispatcher_keys_.push_back(kv.first);}for (uint64_t key : current_dispatcher_keys_) {if (!dispatcher_by_key_.count(key)) {continue;}Dispatcher* disp = dispatcher_by_key_.at(key);if (!disp)continue;if (!process_io && (disp != signal_wakeup_))continue;// 获取socketSOCKET s = disp->GetSocket();if (disp->CheckSignalClose()) {// We just signalled close, don't poll this socket.} else if (s != INVALID_SOCKET) { // 将socket和事件绑定,当事件发生时可以知道哪个socket触发的事件WSAEventSelect(s, events[0],FlagsToEvents(disp->GetRequestedEvents()));} else { // 普通事件的socketevents.push_back(disp->GetWSAEvent());event_owners.push_back(key);}}}// Which is shorter, the delay wait or the asked wait?int64_t cmsNext;if (cmsWait == kForever) {cmsNext = cmsWait;} else {cmsNext = std::max<int64_t>(0, cmsTotal - cmsElapsed);}// Wait for one of the events to signal// 开始事件等待DWORD dw =WSAWaitForMultipleEvents(static_cast<DWORD>(events.size()), &events[0],false, static_cast<DWORD>(cmsNext), false);// 事件发生后,开始判断并处理if (dw == WSA_WAIT_FAILED) {// Failed?// TODO(pthatcher): need a better strategy than this!WSAGetLastError();RTC_NOTREACHED();return false;} else if (dw == WSA_WAIT_TIMEOUT) {// Timeout?return true;} else {// Figure out which one it is and call itCritScope cr(&crit_);int index = dw - WSA_WAIT_EVENT_0;if (index > 0) {--index; // The first event is the socket eventuint64_t key = event_owners[index];if (!dispatcher_by_key_.count(key)) {// The dispatcher could have been removed while waiting for events.continue;}Dispatcher* disp = dispatcher_by_key_.at(key);disp->OnEvent(0, 0);} else if (process_io) {// Iterate only on the dispatchers whose sockets were passed into// WSAEventSelect; this avoids the ABA problem (a socket being// destroyed and a new one created with the same SOCKET handle).for (uint64_t key : current_dispatcher_keys_) {if (!dispatcher_by_key_.count(key)) {continue;}Dispatcher* disp = dispatcher_by_key_.at(key);SOCKET s = disp->GetSocket();if (s == INVALID_SOCKET)continue;WSANETWORKEVENTS wsaEvents;int err = WSAEnumNetworkEvents(s, events[0], &wsaEvents);if (err == 0) {{if ((wsaEvents.lNetworkEvents & FD_READ) &&wsaEvents.iErrorCode[FD_READ_BIT] != 0) {RTC_LOG(WARNING)<< "PhysicalSocketServer got FD_READ_BIT error "<< wsaEvents.iErrorCode[FD_READ_BIT];}if ((wsaEvents.lNetworkEvents & FD_WRITE) &&wsaEvents.iErrorCode[FD_WRITE_BIT] != 0) {RTC_LOG(WARNING)<< "PhysicalSocketServer got FD_WRITE_BIT error "<< wsaEvents.iErrorCode[FD_WRITE_BIT];}if ((wsaEvents.lNetworkEvents & FD_CONNECT) &&wsaEvents.iErrorCode[FD_CONNECT_BIT] != 0) {RTC_LOG(WARNING)<< "PhysicalSocketServer got FD_CONNECT_BIT error "<< wsaEvents.iErrorCode[FD_CONNECT_BIT];}if ((wsaEvents.lNetworkEvents & FD_ACCEPT) &&wsaEvents.iErrorCode[FD_ACCEPT_BIT] != 0) {RTC_LOG(WARNING)<< "PhysicalSocketServer got FD_ACCEPT_BIT error "<< wsaEvents.iErrorCode[FD_ACCEPT_BIT];}if ((wsaEvents.lNetworkEvents & FD_CLOSE) &&wsaEvents.iErrorCode[FD_CLOSE_BIT] != 0) {RTC_LOG(WARNING)<< "PhysicalSocketServer got FD_CLOSE_BIT error "<< wsaEvents.iErrorCode[FD_CLOSE_BIT];}}uint32_t ff = 0;int errcode = 0;if (wsaEvents.lNetworkEvents & FD_READ)ff |= DE_READ;if (wsaEvents.lNetworkEvents & FD_WRITE)ff |= DE_WRITE;if (wsaEvents.lNetworkEvents & FD_CONNECT) {if (wsaEvents.iErrorCode[FD_CONNECT_BIT] == 0) {ff |= DE_CONNECT;} else {ff |= DE_CLOSE;errcode = wsaEvents.iErrorCode[FD_CONNECT_BIT];}}if (wsaEvents.lNetworkEvents & FD_ACCEPT)ff |= DE_ACCEPT;if (wsaEvents.lNetworkEvents & FD_CLOSE) {ff |= DE_CLOSE;errcode = wsaEvents.iErrorCode[FD_CLOSE_BIT];}if (ff != 0) {disp->OnEvent(ff, errcode);}}}}// Reset the network event until new activity occursWSAResetEvent(socket_ev_);}// Break?if (!fWait_)break;cmsElapsed = TimeSince(msStart);if ((cmsWait != kForever) && (cmsElapsed >= cmsWait)) {break;}}// Donereturn true;}
触发事件
void Thread::WakeUpSocketServer() {
ss_->WakeUp();
}

向网络线程发送任务

这个事件,就是由Thread::WakeUpSocketServer触发的。
void Thread::WakeUpSocketServer() {ss_->WakeUp();}-》void PhysicalSocketServer::WakeUp() {signal_wakeup_->Signal();}-》virtual void Signal() {if (hev_ != nullptr)WSASetEvent(hev_);}
事件触发后,又来执行
