1)返回信令确认消息

图片.png

  1. inline void Worker::OnChannelRequest(Channel::ChannelSocket* /*channel*/, Channel::ChannelRequest* request)
  2. {
  3. /*****************************省略代码***************************************/
  4. case Channel::ChannelRequest::MethodId::WORKER_DUMP:
  5. {
  6. json data = json::object();
  7. FillJson(data);
  8. request->Accept(data);
  9. break;
  10. }
  11. /*****************************省略代码***************************************/
  12. case Channel::ChannelRequest::MethodId::WORKER_GET_RESOURCE_USAGE:
  13. {
  14. json data = json::object();
  15. FillJsonResourceUsage(data);
  16. request->Accept(data);
  17. break;
  18. }
  19. /*****************************省略代码***************************************/
  20. case Channel::ChannelRequest::MethodId::WORKER_CREATE_ROUTER:
  21. {
  22. std::string routerId;
  23. // This may throw.
  24. SetNewRouterIdFromInternal(request->internal, routerId);
  25. auto* router = new RTC::Router(routerId);
  26. this->mapRouters[routerId] = router;
  27. MS_DEBUG_DEV("Router created [routerId:%s]", routerId.c_str());
  28. request->Accept();
  29. break;
  30. }
  31. case Channel::ChannelRequest::MethodId::ROUTER_CLOSE:
  32. {
  33. // This may throw.
  34. RTC::Router* router = GetRouterFromInternal(request->internal);
  35. // Remove it from the map and delete it.
  36. this->mapRouters.erase(router->id);
  37. delete router;
  38. MS_DEBUG_DEV("Router closed [id:%s]", router->id.c_str());
  39. request->Accept();
  40. break;
  41. }
  42. /*****************************省略代码***************************************/
  43. }

应答流程

  1. //没有传入参数
  2. void ChannelRequest::Accept()
  3. {
  4. MS_TRACE();
  5. MS_ASSERT(!this->replied, "request already replied");
  6. this->replied = true;
  7. json jsonResponse = json::object();
  8. jsonResponse["id"] = this->id;
  9. jsonResponse["accepted"] = true; //
  10. //ChannelSocket::Send
  11. this->channel->Send(jsonResponse);
  12. }
  13. //有传入参数
  14. void ChannelRequest::Accept(json& data)
  15. {
  16. MS_TRACE();
  17. MS_ASSERT(!this->replied, "request already replied");
  18. this->replied = true;
  19. json jsonResponse = json::object();
  20. jsonResponse["id"] = this->id;
  21. jsonResponse["accepted"] = true;
  22. if (data.is_structured())
  23. jsonResponse["data"] = data;
  24. //ChannelSocket::Send
  25. this->channel->Send(jsonResponse);
  26. }

发送消息

  1. void ChannelSocket::Send(json& jsonMessage)
  2. {
  3. MS_TRACE_STD();
  4. if (this->closed)
  5. return;
  6. std::string message = jsonMessage.dump();
  7. if (message.length() > NsPayloadMaxLen)
  8. {
  9. MS_ERROR_STD("mesage too big");
  10. return;
  11. }
  12. SendImpl(message.c_str(), message.length());
  13. }

调用的SendImpl(message.c_str(), message.length());函数原型如下:

  1. inline void ChannelSocket::SendImpl(const void* nsPayload, size_t nsPayloadLen)
  2. {
  3. MS_TRACE_STD();
  4. size_t nsNumLen;
  5. if (nsPayloadLen == 0)
  6. {
  7. nsNumLen = 1;
  8. this->writeBuffer[0] = '0';
  9. this->writeBuffer[1] = ':';
  10. this->writeBuffer[2] = ',';
  11. }
  12. else
  13. {
  14. nsNumLen = static_cast<size_t>(std::ceil(std::log10(static_cast<double>(nsPayloadLen) + 1)));
  15. std::sprintf(reinterpret_cast<char*>(this->writeBuffer), "%zu:", nsPayloadLen);
  16. std::memcpy(this->writeBuffer + nsNumLen + 1, nsPayload, nsPayloadLen);
  17. this->writeBuffer[nsNumLen + nsPayloadLen + 1] = ',';
  18. }
  19. size_t nsLen = nsNumLen + nsPayloadLen + 2;
  20. this->producerSocket.Write(this->writeBuffer, nsLen);
  21. }

2)创建Notifier

图片.png
上面的是旧版本,新版本在mediasoup\worker\src\lib.cpp

  1. extern "C" int run_worker(
  2. int argc,
  3. char* argv[],
  4. const char* version,
  5. int consumerChannelFd,
  6. int producerChannelFd,
  7. int payloadConsumeChannelFd,
  8. int payloadProduceChannelFd)
  9. {
  10. /*****************************省略代码***************************************/
  11. Channel::ChannelNotifier::ClassInit(channel.get());
  12. /*****************************省略代码***************************************/
  13. // Run the Worker.
  14. Worker worker(channel.get(), payloadChannel.get());
  15. /*****************************省略代码***************************************/
  16. }

3)向上层发送通知

某些操作成功后,会向上层发送通知。
图片.png
以上是旧版本,新版本是在新版本在mediasoup\worker\src\Worker.cpp

  1. Worker::Worker(::Channel::ChannelSocket* channel, PayloadChannel::PayloadChannelSocket* payloadChannel)
  2. : channel(channel), payloadChannel(payloadChannel)
  3. {
  4. MS_TRACE();
  5. // Set us as Channel's listener.
  6. this->channel->SetListener(this);
  7. // Set us as PayloadChannel's listener.
  8. this->payloadChannel->SetListener(this);
  9. // Set the signals handler.
  10. this->signalsHandler = new SignalsHandler(this);
  11. #ifdef MS_EXECUTABLE
  12. {
  13. // Add signals to handle.
  14. this->signalsHandler->AddSignal(SIGINT, "INT");
  15. this->signalsHandler->AddSignal(SIGTERM, "TERM");
  16. }
  17. #endif
  18. // Create the Checker instance in DepUsrSCTP.
  19. DepUsrSCTP::CreateChecker();
  20. // Tell the Node process that we are running.
  21. //去上层发送通知
  22. Channel::ChannelNotifier::Emit(std::to_string(Logger::pid), "running");
  23. MS_DEBUG_DEV("starting libuv loop");
  24. DepLibUV::RunLoop();
  25. MS_DEBUG_DEV("libuv loop ended");
  26. }

在WebRtcTransport类中,很多函数都调用了
Channel::ChannelNotifier::Emit
图片.png
例如其中一个

  1. inline void WebRtcTransport::OnIceServerCompleted(const RTC::IceServer* /*iceServer*/)
  2. {
  3. MS_TRACE();
  4. MS_DEBUG_TAG(ice, "ICE completed");
  5. // Notify the Node WebRtcTransport.
  6. json data = json::object();
  7. data["iceState"] = "completed";
  8. Channel::ChannelNotifier::Emit(this->id, "icestatechange", data);
  9. // If ready, run the DTLS handler.
  10. MayRunDtlsTransport();
  11. // If DTLS was already connected, notify the parent class.
  12. if (this->dtlsTransport->GetState() == RTC::DtlsTransport::DtlsState::CONNECTED)
  13. {
  14. RTC::Transport::Connected();
  15. }
  16. }