前言

image.png

webrtc下事件的发生

image.png

等待线程处理逻辑

image.png
image.png
内层循环,如果没有消息,则Wait,如果有则获取第一个,返回处理。

PhysicalSocketServer的Wait函数

image.png

源码分析

网络线程,PhysicalSocketServer,和事件
工作线程,使用的是NullSocketServer

H:\webrtc-20210315\webrtc-20210315\webrtc\webrtc-checkout\src\rtc_base\physical_socket_server.cc
PhysicalSocketServer::Wait

  1. bool PhysicalSocketServer::Wait(int cmsWait, bool process_io) {
  2. // We don't support reentrant waiting.
  3. RTC_DCHECK(!waiting_);
  4. ScopedSetTrue s(&waiting_);
  5. int64_t cmsTotal = cmsWait;
  6. int64_t cmsElapsed = 0;
  7. int64_t msStart = Time();
  8. fWait_ = true;
  9. while (fWait_) {
  10. std::vector<WSAEVENT> events;
  11. std::vector<uint64_t> event_owners;
  12. // 将要监听的事件添加到数组中
  13. events.push_back(socket_ev_);
  14. {
  15. CritScope cr(&crit_);
  16. // Get a snapshot of all current dispatchers; this is used to avoid the
  17. // ABA problem (see later comment) and avoids the dispatcher_by_key_
  18. // iterator being invalidated by calling CheckSignalClose, which may
  19. // remove the dispatcher from the list.
  20. current_dispatcher_keys_.clear();
  21. for (auto const& kv : dispatcher_by_key_) {
  22. current_dispatcher_keys_.push_back(kv.first);
  23. }
  24. for (uint64_t key : current_dispatcher_keys_) {
  25. if (!dispatcher_by_key_.count(key)) {
  26. continue;
  27. }
  28. Dispatcher* disp = dispatcher_by_key_.at(key);
  29. if (!disp)
  30. continue;
  31. if (!process_io && (disp != signal_wakeup_))
  32. continue;
  33. // 获取socket
  34. SOCKET s = disp->GetSocket();
  35. if (disp->CheckSignalClose()) {
  36. // We just signalled close, don't poll this socket.
  37. } else if (s != INVALID_SOCKET) { // 将socket和事件绑定,当事件发生时可以知道哪个socket触发的事件
  38. WSAEventSelect(s, events[0],
  39. FlagsToEvents(disp->GetRequestedEvents()));
  40. } else { // 普通事件的socket
  41. events.push_back(disp->GetWSAEvent());
  42. event_owners.push_back(key);
  43. }
  44. }
  45. }
  46. // Which is shorter, the delay wait or the asked wait?
  47. int64_t cmsNext;
  48. if (cmsWait == kForever) {
  49. cmsNext = cmsWait;
  50. } else {
  51. cmsNext = std::max<int64_t>(0, cmsTotal - cmsElapsed);
  52. }
  53. // Wait for one of the events to signal
  54. // 开始事件等待
  55. DWORD dw =
  56. WSAWaitForMultipleEvents(static_cast<DWORD>(events.size()), &events[0],
  57. false, static_cast<DWORD>(cmsNext), false);
  58. // 事件发生后,开始判断并处理
  59. if (dw == WSA_WAIT_FAILED) {
  60. // Failed?
  61. // TODO(pthatcher): need a better strategy than this!
  62. WSAGetLastError();
  63. RTC_NOTREACHED();
  64. return false;
  65. } else if (dw == WSA_WAIT_TIMEOUT) {
  66. // Timeout?
  67. return true;
  68. } else {
  69. // Figure out which one it is and call it
  70. CritScope cr(&crit_);
  71. int index = dw - WSA_WAIT_EVENT_0;
  72. if (index > 0) {
  73. --index; // The first event is the socket event
  74. uint64_t key = event_owners[index];
  75. if (!dispatcher_by_key_.count(key)) {
  76. // The dispatcher could have been removed while waiting for events.
  77. continue;
  78. }
  79. Dispatcher* disp = dispatcher_by_key_.at(key);
  80. disp->OnEvent(0, 0);
  81. } else if (process_io) {
  82. // Iterate only on the dispatchers whose sockets were passed into
  83. // WSAEventSelect; this avoids the ABA problem (a socket being
  84. // destroyed and a new one created with the same SOCKET handle).
  85. for (uint64_t key : current_dispatcher_keys_) {
  86. if (!dispatcher_by_key_.count(key)) {
  87. continue;
  88. }
  89. Dispatcher* disp = dispatcher_by_key_.at(key);
  90. SOCKET s = disp->GetSocket();
  91. if (s == INVALID_SOCKET)
  92. continue;
  93. WSANETWORKEVENTS wsaEvents;
  94. int err = WSAEnumNetworkEvents(s, events[0], &wsaEvents);
  95. if (err == 0) {
  96. {
  97. if ((wsaEvents.lNetworkEvents & FD_READ) &&
  98. wsaEvents.iErrorCode[FD_READ_BIT] != 0) {
  99. RTC_LOG(WARNING)
  100. << "PhysicalSocketServer got FD_READ_BIT error "
  101. << wsaEvents.iErrorCode[FD_READ_BIT];
  102. }
  103. if ((wsaEvents.lNetworkEvents & FD_WRITE) &&
  104. wsaEvents.iErrorCode[FD_WRITE_BIT] != 0) {
  105. RTC_LOG(WARNING)
  106. << "PhysicalSocketServer got FD_WRITE_BIT error "
  107. << wsaEvents.iErrorCode[FD_WRITE_BIT];
  108. }
  109. if ((wsaEvents.lNetworkEvents & FD_CONNECT) &&
  110. wsaEvents.iErrorCode[FD_CONNECT_BIT] != 0) {
  111. RTC_LOG(WARNING)
  112. << "PhysicalSocketServer got FD_CONNECT_BIT error "
  113. << wsaEvents.iErrorCode[FD_CONNECT_BIT];
  114. }
  115. if ((wsaEvents.lNetworkEvents & FD_ACCEPT) &&
  116. wsaEvents.iErrorCode[FD_ACCEPT_BIT] != 0) {
  117. RTC_LOG(WARNING)
  118. << "PhysicalSocketServer got FD_ACCEPT_BIT error "
  119. << wsaEvents.iErrorCode[FD_ACCEPT_BIT];
  120. }
  121. if ((wsaEvents.lNetworkEvents & FD_CLOSE) &&
  122. wsaEvents.iErrorCode[FD_CLOSE_BIT] != 0) {
  123. RTC_LOG(WARNING)
  124. << "PhysicalSocketServer got FD_CLOSE_BIT error "
  125. << wsaEvents.iErrorCode[FD_CLOSE_BIT];
  126. }
  127. }
  128. uint32_t ff = 0;
  129. int errcode = 0;
  130. if (wsaEvents.lNetworkEvents & FD_READ)
  131. ff |= DE_READ;
  132. if (wsaEvents.lNetworkEvents & FD_WRITE)
  133. ff |= DE_WRITE;
  134. if (wsaEvents.lNetworkEvents & FD_CONNECT) {
  135. if (wsaEvents.iErrorCode[FD_CONNECT_BIT] == 0) {
  136. ff |= DE_CONNECT;
  137. } else {
  138. ff |= DE_CLOSE;
  139. errcode = wsaEvents.iErrorCode[FD_CONNECT_BIT];
  140. }
  141. }
  142. if (wsaEvents.lNetworkEvents & FD_ACCEPT)
  143. ff |= DE_ACCEPT;
  144. if (wsaEvents.lNetworkEvents & FD_CLOSE) {
  145. ff |= DE_CLOSE;
  146. errcode = wsaEvents.iErrorCode[FD_CLOSE_BIT];
  147. }
  148. if (ff != 0) {
  149. disp->OnEvent(ff, errcode);
  150. }
  151. }
  152. }
  153. }
  154. // Reset the network event until new activity occurs
  155. WSAResetEvent(socket_ev_);
  156. }
  157. // Break?
  158. if (!fWait_)
  159. break;
  160. cmsElapsed = TimeSince(msStart);
  161. if ((cmsWait != kForever) && (cmsElapsed >= cmsWait)) {
  162. break;
  163. }
  164. }
  165. // Done
  166. return true;
  167. }

触发事件

void Thread::WakeUpSocketServer() {
ss_->WakeUp();
}
image.png
image.png

向网络线程发送任务

image.png
这个事件,就是由Thread::WakeUpSocketServer触发的。

  1. void Thread::WakeUpSocketServer() {
  2. ss_->WakeUp();
  3. }
  4. -》
  5. void PhysicalSocketServer::WakeUp() {
  6. signal_wakeup_->Signal();
  7. }
  8. -》
  9. virtual void Signal() {
  10. if (hev_ != nullptr)
  11. WSASetEvent(hev_);
  12. }

事件触发后,又来执行
image.png