1)返回信令确认消息

inline void Worker::OnChannelRequest(Channel::ChannelSocket* /*channel*/, Channel::ChannelRequest* request){/*****************************省略代码***************************************/case Channel::ChannelRequest::MethodId::WORKER_DUMP:{json data = json::object();FillJson(data);request->Accept(data);break;}/*****************************省略代码***************************************/case Channel::ChannelRequest::MethodId::WORKER_GET_RESOURCE_USAGE:{json data = json::object();FillJsonResourceUsage(data);request->Accept(data);break;}/*****************************省略代码***************************************/case Channel::ChannelRequest::MethodId::WORKER_CREATE_ROUTER:{std::string routerId;// This may throw.SetNewRouterIdFromInternal(request->internal, routerId);auto* router = new RTC::Router(routerId);this->mapRouters[routerId] = router;MS_DEBUG_DEV("Router created [routerId:%s]", routerId.c_str());request->Accept();break;}case Channel::ChannelRequest::MethodId::ROUTER_CLOSE:{// This may throw.RTC::Router* router = GetRouterFromInternal(request->internal);// Remove it from the map and delete it.this->mapRouters.erase(router->id);delete router;MS_DEBUG_DEV("Router closed [id:%s]", router->id.c_str());request->Accept();break;}/*****************************省略代码***************************************/}
应答流程
//没有传入参数void ChannelRequest::Accept(){MS_TRACE();MS_ASSERT(!this->replied, "request already replied");this->replied = true;json jsonResponse = json::object();jsonResponse["id"] = this->id;jsonResponse["accepted"] = true; ////ChannelSocket::Sendthis->channel->Send(jsonResponse);}//有传入参数void ChannelRequest::Accept(json& data){MS_TRACE();MS_ASSERT(!this->replied, "request already replied");this->replied = true;json jsonResponse = json::object();jsonResponse["id"] = this->id;jsonResponse["accepted"] = true;if (data.is_structured())jsonResponse["data"] = data;//ChannelSocket::Sendthis->channel->Send(jsonResponse);}
发送消息
void ChannelSocket::Send(json& jsonMessage){MS_TRACE_STD();if (this->closed)return;std::string message = jsonMessage.dump();if (message.length() > NsPayloadMaxLen){MS_ERROR_STD("mesage too big");return;}SendImpl(message.c_str(), message.length());}
调用的SendImpl(message.c_str(), message.length());函数原型如下:
inline void ChannelSocket::SendImpl(const void* nsPayload, size_t nsPayloadLen){MS_TRACE_STD();size_t nsNumLen;if (nsPayloadLen == 0){nsNumLen = 1;this->writeBuffer[0] = '0';this->writeBuffer[1] = ':';this->writeBuffer[2] = ',';}else{nsNumLen = static_cast<size_t>(std::ceil(std::log10(static_cast<double>(nsPayloadLen) + 1)));std::sprintf(reinterpret_cast<char*>(this->writeBuffer), "%zu:", nsPayloadLen);std::memcpy(this->writeBuffer + nsNumLen + 1, nsPayload, nsPayloadLen);this->writeBuffer[nsNumLen + nsPayloadLen + 1] = ',';}size_t nsLen = nsNumLen + nsPayloadLen + 2;this->producerSocket.Write(this->writeBuffer, nsLen);}
2)创建Notifier

上面的是旧版本,新版本在mediasoup\worker\src\lib.cpp
extern "C" int run_worker(int argc,char* argv[],const char* version,int consumerChannelFd,int producerChannelFd,int payloadConsumeChannelFd,int payloadProduceChannelFd){/*****************************省略代码***************************************/Channel::ChannelNotifier::ClassInit(channel.get());/*****************************省略代码***************************************/// Run the Worker.Worker worker(channel.get(), payloadChannel.get());/*****************************省略代码***************************************/}
3)向上层发送通知
某些操作成功后,会向上层发送通知。
以上是旧版本,新版本是在新版本在mediasoup\worker\src\Worker.cpp
Worker::Worker(::Channel::ChannelSocket* channel, PayloadChannel::PayloadChannelSocket* payloadChannel): channel(channel), payloadChannel(payloadChannel){MS_TRACE();// Set us as Channel's listener.this->channel->SetListener(this);// Set us as PayloadChannel's listener.this->payloadChannel->SetListener(this);// Set the signals handler.this->signalsHandler = new SignalsHandler(this);#ifdef MS_EXECUTABLE{// Add signals to handle.this->signalsHandler->AddSignal(SIGINT, "INT");this->signalsHandler->AddSignal(SIGTERM, "TERM");}#endif// Create the Checker instance in DepUsrSCTP.DepUsrSCTP::CreateChecker();// Tell the Node process that we are running.//去上层发送通知Channel::ChannelNotifier::Emit(std::to_string(Logger::pid), "running");MS_DEBUG_DEV("starting libuv loop");DepLibUV::RunLoop();MS_DEBUG_DEV("libuv loop ended");}
在WebRtcTransport类中,很多函数都调用了
Channel::ChannelNotifier::Emit
例如其中一个
inline void WebRtcTransport::OnIceServerCompleted(const RTC::IceServer* /*iceServer*/){MS_TRACE();MS_DEBUG_TAG(ice, "ICE completed");// Notify the Node WebRtcTransport.json data = json::object();data["iceState"] = "completed";Channel::ChannelNotifier::Emit(this->id, "icestatechange", data);// If ready, run the DTLS handler.MayRunDtlsTransport();// If DTLS was already connected, notify the parent class.if (this->dtlsTransport->GetState() == RTC::DtlsTransport::DtlsState::CONNECTED){RTC::Transport::Connected();}}
