前言
peerconnection收到sdp封装后的message后,将其解析。
1、发送消息
void PeerConnectionClient::SendToPeer(int peer_id, const std::string& message)
2、接收消息
void PeerConnectionClient::OnMessageFromPeer(int peer_id, const std::string& message)
PeerConnectionClient::OnMessageFromPeer
void PeerConnectionClient::OnMessageFromPeer(int peer_id,
const std::string& message) {
if (message.length() == (sizeof(kByeMessage) - 1) &&
message.compare(kByeMessage) == 0) {
callback_->OnPeerDisconnected(peer_id);
} else {
callback_->OnMessageFromPeer(peer_id, message);
}
}
里面调用的OnMessageFromPeer是 Conductor::OnMessageFromPeer
Conductor::OnMessageFromPeer
void Conductor::OnMessageFromPeer(int peer_id, const std::string& message) {
RTC_DCHECK(peer_id_ == peer_id || peer_id_ == -1);
RTC_DCHECK(!message.empty());
if (!peer_connection_.get()) {
RTC_DCHECK(peer_id_ == -1);
peer_id_ = peer_id;
if (!InitializePeerConnection()) {
RTC_LOG(LS_ERROR) << "Failed to initialize our PeerConnection instance";
client_->SignOut();
return;
}
} else if (peer_id != peer_id_) {
RTC_DCHECK(peer_id_ != -1);
RTC_LOG(WARNING)
<< "Received a message from unknown peer while already in a "
"conversation with a different peer.";
return;
}
// json解析接收到的对端消息
Json::Reader reader;
Json::Value jmessage;
if (!reader.parse(message, jmessage)) {
RTC_LOG(WARNING) << "Received unknown message. " << message;
return;
}
std::string type_str;
std::string json_object;
// 获取信令类型,type_str为answer或者offer,
// 如果是本端发起通信,第一次会收到answer类型,后面收到的就是空类型
rtc::GetStringFromJsonObject(jmessage, kSessionDescriptionTypeName,
&type_str);
if (!type_str.empty()) { // 类型不为空
if (type_str == "offer-loopback") {
// This is a loopback call.
// Recreate the peerconnection with DTLS disabled.
if (!ReinitializePeerConnectionForLoopback()) {
RTC_LOG(LS_ERROR) << "Failed to initialize our PeerConnection instance";
DeletePeerConnection();
client_->SignOut();
}
return;
}
// 获取sdp类型
absl::optional<webrtc::SdpType> type_maybe =
webrtc::SdpTypeFromString(type_str);
if (!type_maybe) {
RTC_LOG(LS_ERROR) << "Unknown SDP type: " << type_str;
return;
}
webrtc::SdpType type = *type_maybe;
// 从json中解析出一个个的文本sdp消息
std::string sdp;
if (!rtc::GetStringFromJsonObject(jmessage, kSessionDescriptionSdpName,
&sdp)) {
RTC_LOG(WARNING) << "Can't parse received session description message.";
return;
}
// 创建SessionDescriptionInterface对象保存sdp
webrtc::SdpParseError error;
std::unique_ptr<webrtc::SessionDescriptionInterface> session_description =
webrtc::CreateSessionDescription(type, sdp, &error);
if (!session_description) {
RTC_LOG(WARNING) << "Can't parse received session description message. "
"SdpParseError was: "
<< error.description;
return;
}
RTC_LOG(INFO) << " Received session description :" << message;
// 设置远端的消息保存到内存
peer_connection_->SetRemoteDescription(
DummySetSessionDescriptionObserver::Create(),
session_description.release());
// 如果是offer消息,则代表是对端发起通信,需要发给对端answer
if (type == webrtc::SdpType::kOffer) {
peer_connection_->CreateAnswer(
this, webrtc::PeerConnectionInterface::RTCOfferAnswerOptions());
}
} else {
std::string sdp_mid;
int sdp_mlineindex = 0;
// 获取sdp信息
std::string sdp;
if (!rtc::GetStringFromJsonObject(jmessage, kCandidateSdpMidName,
&sdp_mid) ||
!rtc::GetIntFromJsonObject(jmessage, kCandidateSdpMlineIndexName,
&sdp_mlineindex) ||
!rtc::GetStringFromJsonObject(jmessage, kCandidateSdpName, &sdp)) {
RTC_LOG(WARNING) << "Can't parse received message.";
return;
}
// 根据sdp,创建candidate
webrtc::SdpParseError error;
std::unique_ptr<webrtc::IceCandidateInterface> candidate(
webrtc::CreateIceCandidate(sdp_mid, sdp_mlineindex, sdp, &error));
if (!candidate.get()) {
RTC_LOG(WARNING) << "Can't parse received candidate message. "
"SdpParseError was: "
<< error.description;
return;
}
// 添加AddIceCandidate
if (!peer_connection_->AddIceCandidate(candidate.get())) {
RTC_LOG(WARNING) << "Failed to apply the received candidate";
return;
}
RTC_LOG(INFO) << " Received candidate :" << message;
}
}
取出type_str和sdp后,创建session_description,
std::unique_ptr
webrtc::CreateSessionDescription(type, sdp, &error);
webrtc::CreateSessionDescription
std::unique_ptr<SessionDescriptionInterface> CreateSessionDescription(
SdpType type,
const std::string& sdp,
SdpParseError* error_out) {
auto jsep_desc = std::make_unique<JsepSessionDescription>(type);
if (type != SdpType::kRollback) {
if (!SdpDeserialize(sdp, jsep_desc.get(), error_out)) {
return nullptr;
}
}
return std::move(jsep_desc);
}
里面主要是调用了SdpDeserialize,反序列化解析sdp文本出来。跟之前的序列化SdpSerialize函数相对应。
SdpDeserialize
bool SdpDeserialize(const std::string& message,
JsepSessionDescription* jdesc,
SdpParseError* error) {
std::string session_id;
std::string session_version;
TransportDescription session_td("", "");
RtpHeaderExtensions session_extmaps;
rtc::SocketAddress session_connection_addr;
auto desc = std::make_unique<cricket::SessionDescription>();
size_t current_pos = 0;
// Session Description
if (!ParseSessionDescription(message, ¤t_pos, &session_id,
&session_version, &session_td, &session_extmaps,
&session_connection_addr, desc.get(), error)) {
return false;
}
// Media Description
std::vector<std::unique_ptr<JsepIceCandidate>> candidates;
if (!ParseMediaDescription(message, session_td, session_extmaps, ¤t_pos,
session_connection_addr, desc.get(), &candidates,
error)) {
return false;
}
jdesc->Initialize(std::move(desc), session_id, session_version);
for (const auto& candidate : candidates) {
jdesc->AddCandidate(candidate.get());
}
return true;
}
重点看媒体会话描述ParseMediaDescription
ParseMediaDescription
bool ParseMediaDescription(***){
***
// Zero or more media descriptions
// RFC 4566
// m=<media> <port> <proto> <fmt> 获取每一行
while (GetLineWithType(message, pos, &line, kLineTypeMedia)) {
++mline_index;
// 按照空格来分割,将分割后的内容保存到fields
std::vector<std::string> fields;
rtc::split(line.substr(kLinePrefixLength), kSdpDelimiterSpaceChar, &fields);
const size_t expected_min_fields = 4;
if (fields.size() < expected_min_fields) {
return ParseFailedExpectMinFieldNum(line, expected_min_fields, error);
}
bool port_rejected = false;
// RFC 3264
// To reject an offered stream, the port number in the corresponding stream
// in the answer MUST be set to zero.
if (fields[1] == kMediaPortRejected) {
port_rejected = true;
}
int port = 0;
if (!rtc::FromString<int>(fields[1], &port) || !IsValidPort(port)) {
return ParseFailed(line, "The port number is invalid", error);
}
const std::string& protocol = fields[2];
// <fmt> 获取payload_types
std::vector<int> payload_types;
if (cricket::IsRtpProtocol(protocol)) {
for (size_t j = 3; j < fields.size(); ++j) {
int pl = 0;
if (!GetPayloadTypeFromString(line, fields[j], &pl, error)) {
return false;
}
payload_types.push_back(pl);
}
}
// 构建临时的TransportDescription
// Make a temporary TransportDescription based on |session_td|.
// Some of this gets overwritten by ParseContent.
TransportDescription transport(
session_td.transport_options, session_td.ice_ufrag, session_td.ice_pwd,
session_td.ice_mode, session_td.connection_role,
session_td.identity_fingerprint.get());
//媒体信息解析
std::unique_ptr<MediaContentDescription> content;
std::string content_name;
bool bundle_only = false;
int section_msid_signaling = 0;
const std::string& media_type = fields[0];
if ((media_type == kMediaTypeVideo || media_type == kMediaTypeAudio) &&
!cricket::IsRtpProtocol(protocol)) {
return ParseFailed(line, "Unsupported protocol for media type", error);
}
if (media_type == kMediaTypeVideo) { 视频
content = ParseContentDescription<VideoContentDescription>(
message, cricket::MEDIA_TYPE_VIDEO, mline_index, protocol,
payload_types, pos, &content_name, &bundle_only,
§ion_msid_signaling, &transport, candidates, error);
} else if (media_type == kMediaTypeAudio) {
content = ParseContentDescription<AudioContentDescription>(
message, cricket::MEDIA_TYPE_AUDIO, mline_index, protocol,
payload_types, pos, &content_name, &bundle_only,
§ion_msid_signaling, &transport, candidates, error);
} else if (media_type == kMediaTypeData) { 媒体类型数据
if (cricket::IsDtlsSctp(protocol)) {
***
}
if (!ParseContent(message, cricket::MEDIA_TYPE_DATA, mline_index,
protocol, payload_types, pos, &content_name,
&bundle_only, §ion_msid_signaling,
data_desc.get(), &transport, candidates, error)) {
return false;
}
**
} else if (cricket::IsRtpProtocol(protocol)) {
// RTP
std::unique_ptr<RtpDataContentDescription> data_desc =
ParseContentDescription<RtpDataContentDescription>***
} else {
return ParseFailed(line, "Unsupported protocol for media type", error);
}
} else {
RTC_LOG(LS_WARNING) << "Unsupported media type: " << line;
auto unsupported_desc =
std::make_unique<UnsupportedContentDescription>(media_type);
if (!ParseContent(message, cricket::MEDIA_TYPE_UNSUPPORTED, mline_index,
protocol, payload_types, pos, &content_name,
&bundle_only, §ion_msid_signaling,
unsupported_desc.get(), &transport, candidates,
error)) {
return false;
}
***
bool content_rejected = false;
// A port of 0 is not interpreted as a rejected m= section when it's
// used along with a=bundle-only.
if (bundle_only) {
if (!port_rejected) {
// Usage of bundle-only with a nonzero port is unspecified. So just
// ignore bundle-only if we see this.
bundle_only = false;
RTC_LOG(LS_WARNING)
<< "a=bundle-only attribute observed with a nonzero "
"port; this usage is unspecified so the attribute is being "
"ignored.";
}
} else {
// If not using bundle-only, interpret port 0 in the normal way; the m=
// section is being rejected.
content_rejected = port_rejected;
}
***
for (size_t i = 0; i < session_extmaps.size(); ++i) {
content->AddRtpHeaderExtension(session_extmaps[i]);
}
} else if (content->as_sctp()) {
// Do nothing, it's OK
} else {
RTC_LOG(LS_WARNING) << "Parse failed with unknown protocol " << protocol;
return false;
}
// Use the session level connection address if the media level addresses are
// not specified.
rtc::SocketAddress address;
address = content->connection_address().IsNil()
? session_connection_addr
: content->connection_address();
address.SetPort(port);
content->set_connection_address(address);
desc->AddContent(content_name,
cricket::IsDtlsSctp(protocol) ? MediaProtocolType::kSctp
: MediaProtocolType::kRtp,
content_rejected, bundle_only, std::move(content));
// Create TransportInfo with the media level "ice-pwd" and "ice-ufrag".
desc->AddTransportInfo(TransportInfo(content_name, transport));
}
desc->set_msid_signaling(msid_signaling);
**
return true;
}
查看payload_types
后面以media_type == kMediaTypeAudio为例子,看音频类型的解析content
content = ParseContentDescription
message, cricket::MEDIA_TYPE_AUDIO, mline_index, protocol,
payload_types, pos, &content_name, &bundle_only,
§ion_msid_signaling, &transport, candidates, error);
ParseContentDescription
template <class C>
static std::unique_ptr<C> ParseContentDescription(
const std::string& message,
const cricket::MediaType media_type,
int mline_index,
const std::string& protocol,
const std::vector<int>& payload_types,
size_t* pos,
std::string* content_name,
bool* bundle_only,
int* msid_signaling,
TransportDescription* transport,
std::vector<std::unique_ptr<JsepIceCandidate>>* candidates,
webrtc::SdpParseError* error) {
auto media_desc = std::make_unique<C>();
media_desc->set_extmap_allow_mixed_enum(MediaContentDescription::kNo);
if (!ParseContent(message, media_type, mline_index, protocol, payload_types,
pos, content_name, bundle_only, msid_signaling,
media_desc.get(), transport, candidates, error)) {
return nullptr;
}
// Sort the codecs according to the m-line fmt list.
std::unordered_map<int, int> payload_type_preferences;
// "size + 1" so that the lowest preference payload type has a preference of
// 1, which is greater than the default (0) for payload types not in the fmt
// list.
int preference = static_cast<int>(payload_types.size() + 1);
for (int pt : payload_types) {
payload_type_preferences[pt] = preference--;
}
std::vector<typename C::CodecType> codecs = media_desc->codecs();
absl::c_sort(
codecs, [&payload_type_preferences](const typename C::CodecType& a,
const typename C::CodecType& b) {
return payload_type_preferences[a.id] > payload_type_preferences[b.id];
});
media_desc->set_codecs(codecs);
return media_desc;
}
ParseContent
bool ParseContent(const std::string& message,
const cricket::MediaType media_type,
int mline_index,
const std::string& protocol,
const std::vector<int>& payload_types,
size_t* pos,
std::string* content_name,
bool* bundle_only,
int* msid_signaling,
MediaContentDescription* media_desc,
TransportDescription* transport,
std::vector<std::unique_ptr<JsepIceCandidate>>* candidates,
SdpParseError* error) {
RTC_DCHECK(media_desc != NULL);
RTC_DCHECK(content_name != NULL);
RTC_DCHECK(transport != NULL);
if (media_type == cricket::MEDIA_TYPE_AUDIO) {
MaybeCreateStaticPayloadAudioCodecs(payload_types, media_desc->as_audio());
}
// The media level "ice-ufrag" and "ice-pwd".
// The candidates before update the media level "ice-pwd" and "ice-ufrag".
Candidates candidates_orig;
std::string line;
std::string mline_id;
// Tracks created out of the ssrc attributes.
StreamParamsVec tracks;
SsrcInfoVec ssrc_infos;
SsrcGroupVec ssrc_groups;
std::string maxptime_as_string;
std::string ptime_as_string;
std::vector<std::string> stream_ids;
std::string track_id;
SdpSerializer deserializer;
std::vector<RidDescription> rids;
SimulcastDescription simulcast;
// Loop until the next m line
// 循环直到下一个m行
while (!IsLineType(message, kLineTypeMedia, *pos)) {
***
}
// Remove duplicate or inconsistent rids.
RemoveInvalidRidDescriptions(payload_types, &rids);
// If simulcast is specifed, split the rids into send and receive.
// Rids that do not appear in simulcast attribute will be removed.
// If it is not specified, we assume that all rids are for send layers.
std::vector<RidDescription> send_rids;
std::vector<RidDescription> receive_rids;
if (!simulcast.empty()) {
// Verify that the rids in simulcast match rids in sdp.
RemoveInvalidRidsFromSimulcast(rids, &simulcast);
// Use simulcast description to figure out Send / Receive RIDs.
std::map<std::string, RidDescription> rid_map;
for (const RidDescription& rid : rids) {
rid_map[rid.rid] = rid;
}
for (const auto& layer : simulcast.send_layers().GetAllLayers()) {
auto iter = rid_map.find(layer.rid);
RTC_DCHECK(iter != rid_map.end());
send_rids.push_back(iter->second);
}
for (const auto& layer : simulcast.receive_layers().GetAllLayers()) {
auto iter = rid_map.find(layer.rid);
RTC_DCHECK(iter != rid_map.end());
receive_rids.push_back(iter->second);
}
media_desc->set_simulcast_description(simulcast);
} else {
send_rids = rids;
}
media_desc->set_receive_rids(receive_rids);
// Create tracks from the |ssrc_infos|.
// If the stream_id/track_id for all SSRCS are identical, one StreamParams
// will be created in CreateTracksFromSsrcInfos, containing all the SSRCs from
// the m= section.
if (!ssrc_infos.empty()) {
CreateTracksFromSsrcInfos(ssrc_infos, stream_ids, track_id, &tracks,
*msid_signaling);
} else if (media_type != cricket::MEDIA_TYPE_DATA &&
(*msid_signaling & cricket::kMsidSignalingMediaSection)) {
// If the stream_ids/track_id was signaled but SSRCs were unsignaled we
// still create a track. This isn't done for data media types because
// StreamParams aren't used for SCTP streams, and RTP data channels don't
// support unsignaled SSRCs.
CreateTrackWithNoSsrcs(stream_ids, track_id, send_rids, &tracks);
}
// Add the ssrc group to the track.
for (const SsrcGroup& ssrc_group : ssrc_groups) {
if (ssrc_group.ssrcs.empty()) {
continue;
}
uint32_t ssrc = ssrc_group.ssrcs.front();
for (StreamParams& track : tracks) {
if (track.has_ssrc(ssrc)) {
track.ssrc_groups.push_back(ssrc_group);
}
}
}
// Add the new tracks to the |media_desc|.
for (StreamParams& track : tracks) {
media_desc->AddStream(track);
}
if (media_type == cricket::MEDIA_TYPE_AUDIO) {
AudioContentDescription* audio_desc = media_desc->as_audio();
UpdateFromWildcardCodecs(audio_desc);
// Verify audio codec ensures that no audio codec has been populated with
// only fmtp.
if (!VerifyAudioCodecs(audio_desc)) {
return ParseFailed("Failed to parse audio codecs correctly.", error);
}
AddAudioAttribute(kCodecParamMaxPTime, maxptime_as_string, audio_desc);
AddAudioAttribute(kCodecParamPTime, ptime_as_string, audio_desc);
}
if (media_type == cricket::MEDIA_TYPE_VIDEO) {
VideoContentDescription* video_desc = media_desc->as_video();
UpdateFromWildcardCodecs(video_desc);
// Verify video codec ensures that no video codec has been populated with
// only rtcp-fb.
if (!VerifyVideoCodecs(video_desc)) {
return ParseFailed("Failed to parse video codecs correctly.", error);
}
}
// RFC 5245
// Update the candidates with the media level "ice-pwd" and "ice-ufrag".
for (Candidate& candidate : candidates_orig) {
RTC_DCHECK(candidate.username().empty() ||
candidate.username() == transport->ice_ufrag);
candidate.set_username(transport->ice_ufrag);
RTC_DCHECK(candidate.password().empty());
candidate.set_password(transport->ice_pwd);
candidates->push_back(
std::make_unique<JsepIceCandidate>(mline_id, mline_index, candidate));
}
return true;