1)返回信令确认消息
inline void Worker::OnChannelRequest(Channel::ChannelSocket* /*channel*/, Channel::ChannelRequest* request)
{
/*****************************省略代码***************************************/
case Channel::ChannelRequest::MethodId::WORKER_DUMP:
{
json data = json::object();
FillJson(data);
request->Accept(data);
break;
}
/*****************************省略代码***************************************/
case Channel::ChannelRequest::MethodId::WORKER_GET_RESOURCE_USAGE:
{
json data = json::object();
FillJsonResourceUsage(data);
request->Accept(data);
break;
}
/*****************************省略代码***************************************/
case Channel::ChannelRequest::MethodId::WORKER_CREATE_ROUTER:
{
std::string routerId;
// This may throw.
SetNewRouterIdFromInternal(request->internal, routerId);
auto* router = new RTC::Router(routerId);
this->mapRouters[routerId] = router;
MS_DEBUG_DEV("Router created [routerId:%s]", routerId.c_str());
request->Accept();
break;
}
case Channel::ChannelRequest::MethodId::ROUTER_CLOSE:
{
// This may throw.
RTC::Router* router = GetRouterFromInternal(request->internal);
// Remove it from the map and delete it.
this->mapRouters.erase(router->id);
delete router;
MS_DEBUG_DEV("Router closed [id:%s]", router->id.c_str());
request->Accept();
break;
}
/*****************************省略代码***************************************/
}
应答流程
//没有传入参数
void ChannelRequest::Accept()
{
MS_TRACE();
MS_ASSERT(!this->replied, "request already replied");
this->replied = true;
json jsonResponse = json::object();
jsonResponse["id"] = this->id;
jsonResponse["accepted"] = true; //
//ChannelSocket::Send
this->channel->Send(jsonResponse);
}
//有传入参数
void ChannelRequest::Accept(json& data)
{
MS_TRACE();
MS_ASSERT(!this->replied, "request already replied");
this->replied = true;
json jsonResponse = json::object();
jsonResponse["id"] = this->id;
jsonResponse["accepted"] = true;
if (data.is_structured())
jsonResponse["data"] = data;
//ChannelSocket::Send
this->channel->Send(jsonResponse);
}
发送消息
void ChannelSocket::Send(json& jsonMessage)
{
MS_TRACE_STD();
if (this->closed)
return;
std::string message = jsonMessage.dump();
if (message.length() > NsPayloadMaxLen)
{
MS_ERROR_STD("mesage too big");
return;
}
SendImpl(message.c_str(), message.length());
}
调用的SendImpl(message.c_str(), message.length());函数原型如下:
inline void ChannelSocket::SendImpl(const void* nsPayload, size_t nsPayloadLen)
{
MS_TRACE_STD();
size_t nsNumLen;
if (nsPayloadLen == 0)
{
nsNumLen = 1;
this->writeBuffer[0] = '0';
this->writeBuffer[1] = ':';
this->writeBuffer[2] = ',';
}
else
{
nsNumLen = static_cast<size_t>(std::ceil(std::log10(static_cast<double>(nsPayloadLen) + 1)));
std::sprintf(reinterpret_cast<char*>(this->writeBuffer), "%zu:", nsPayloadLen);
std::memcpy(this->writeBuffer + nsNumLen + 1, nsPayload, nsPayloadLen);
this->writeBuffer[nsNumLen + nsPayloadLen + 1] = ',';
}
size_t nsLen = nsNumLen + nsPayloadLen + 2;
this->producerSocket.Write(this->writeBuffer, nsLen);
}
2)创建Notifier
上面的是旧版本,新版本在mediasoup\worker\src\lib.cpp
extern "C" int run_worker(
int argc,
char* argv[],
const char* version,
int consumerChannelFd,
int producerChannelFd,
int payloadConsumeChannelFd,
int payloadProduceChannelFd)
{
/*****************************省略代码***************************************/
Channel::ChannelNotifier::ClassInit(channel.get());
/*****************************省略代码***************************************/
// Run the Worker.
Worker worker(channel.get(), payloadChannel.get());
/*****************************省略代码***************************************/
}
3)向上层发送通知
某些操作成功后,会向上层发送通知。
以上是旧版本,新版本是在新版本在mediasoup\worker\src\Worker.cpp
Worker::Worker(::Channel::ChannelSocket* channel, PayloadChannel::PayloadChannelSocket* payloadChannel)
: channel(channel), payloadChannel(payloadChannel)
{
MS_TRACE();
// Set us as Channel's listener.
this->channel->SetListener(this);
// Set us as PayloadChannel's listener.
this->payloadChannel->SetListener(this);
// Set the signals handler.
this->signalsHandler = new SignalsHandler(this);
#ifdef MS_EXECUTABLE
{
// Add signals to handle.
this->signalsHandler->AddSignal(SIGINT, "INT");
this->signalsHandler->AddSignal(SIGTERM, "TERM");
}
#endif
// Create the Checker instance in DepUsrSCTP.
DepUsrSCTP::CreateChecker();
// Tell the Node process that we are running.
//去上层发送通知
Channel::ChannelNotifier::Emit(std::to_string(Logger::pid), "running");
MS_DEBUG_DEV("starting libuv loop");
DepLibUV::RunLoop();
MS_DEBUG_DEV("libuv loop ended");
}
在WebRtcTransport类中,很多函数都调用了
Channel::ChannelNotifier::Emit
例如其中一个
inline void WebRtcTransport::OnIceServerCompleted(const RTC::IceServer* /*iceServer*/)
{
MS_TRACE();
MS_DEBUG_TAG(ice, "ICE completed");
// Notify the Node WebRtcTransport.
json data = json::object();
data["iceState"] = "completed";
Channel::ChannelNotifier::Emit(this->id, "icestatechange", data);
// If ready, run the DTLS handler.
MayRunDtlsTransport();
// If DTLS was already connected, notify the parent class.
if (this->dtlsTransport->GetState() == RTC::DtlsTransport::DtlsState::CONNECTED)
{
RTC::Transport::Connected();
}
}