总体流程image.png

代码分析

AudioTransportImpl::RecordedDataIsAvailable

  1. // Not used in Chromium. Process captured audio and distribute to all sending
  2. // streams, and try to do this at the lowest possible sample rate.
  3. int32_t AudioTransportImpl::RecordedDataIsAvailable(
  4. const void* audio_data,
  5. const size_t number_of_frames,
  6. const size_t bytes_per_sample,
  7. const size_t number_of_channels,
  8. const uint32_t sample_rate,
  9. const uint32_t audio_delay_milliseconds,
  10. const int32_t /*clock_drift*/,
  11. const uint32_t /*volume*/,
  12. const bool key_pressed,
  13. uint32_t& /*new_mic_volume*/) { // NOLINT: to avoid changing APIs
  14. RTC_DCHECK(audio_data);
  15. RTC_DCHECK_GE(number_of_channels, 1);
  16. RTC_DCHECK_LE(number_of_channels, 2);
  17. RTC_DCHECK_EQ(2 * number_of_channels, bytes_per_sample);
  18. RTC_DCHECK_GE(sample_rate, AudioProcessing::NativeRate::kSampleRate8kHz);
  19. // 100 = 1 second / data duration (10 ms).
  20. RTC_DCHECK_EQ(number_of_frames * 100, sample_rate);
  21. RTC_DCHECK_LE(bytes_per_sample * number_of_frames * number_of_channels,
  22. AudioFrame::kMaxDataSizeBytes);
  23. int send_sample_rate_hz = 0;
  24. size_t send_num_channels = 0;
  25. bool swap_stereo_channels = false;
  26. {
  27. MutexLock lock(&capture_lock_);
  28. send_sample_rate_hz = send_sample_rate_hz_;
  29. send_num_channels = send_num_channels_;
  30. swap_stereo_channels = swap_stereo_channels_;
  31. }
  32. std::unique_ptr<AudioFrame> audio_frame(new AudioFrame());
  33. InitializeCaptureFrame(sample_rate, send_sample_rate_hz, number_of_channels,
  34. send_num_channels, audio_frame.get());
  35. voe::RemixAndResample(static_cast<const int16_t*>(audio_data),
  36. number_of_frames, number_of_channels, sample_rate,
  37. &capture_resampler_, audio_frame.get());
  38. ProcessCaptureFrame(audio_delay_milliseconds, key_pressed,
  39. swap_stereo_channels, audio_processing_,
  40. audio_frame.get());
  41. // Typing detection (utilizes the APM/VAD decision). We let the VAD determine
  42. // if we're using this feature or not.
  43. // TODO(solenberg): GetConfig() takes a lock. Work around that.
  44. bool typing_detected = false;
  45. if (audio_processing_ &&
  46. audio_processing_->GetConfig().voice_detection.enabled) {
  47. if (audio_frame->vad_activity_ != AudioFrame::kVadUnknown) {
  48. bool vad_active = audio_frame->vad_activity_ == AudioFrame::kVadActive;
  49. typing_detected = typing_detection_.Process(key_pressed, vad_active);
  50. }
  51. }
  52. // Copy frame and push to each sending stream. The copy is required since an
  53. // encoding task will be posted internally to each stream.
  54. {
  55. MutexLock lock(&capture_lock_);
  56. typing_noise_detected_ = typing_detected;
  57. }
  58. RTC_DCHECK_GT(audio_frame->samples_per_channel_, 0);
  59. if (async_audio_processing_)
  60. async_audio_processing_->Process(std::move(audio_frame));
  61. else
  62. SendProcessedData(std::move(audio_frame));
  63. return 0;
  64. }

image.png
—》

AudioTransportImpl::SendProcessedData

  1. void AudioTransportImpl::SendProcessedData(
  2. std::unique_ptr<AudioFrame> audio_frame) {
  3. RTC_DCHECK_GT(audio_frame->samples_per_channel_, 0);
  4. MutexLock lock(&capture_lock_);
  5. if (audio_senders_.empty())
  6. return;
  7. auto it = audio_senders_.begin();
  8. while (++it != audio_senders_.end()) {
  9. auto audio_frame_copy = std::make_unique<AudioFrame>();
  10. audio_frame_copy->CopyFrom(*audio_frame);
  11. (*it)->SendAudioData(std::move(audio_frame_copy));
  12. }
  13. // Send the original frame to the first stream w/o copying.
  14. (*audio_senders_.begin())->SendAudioData(std::move(audio_frame));
  15. }

—》

AudioSendStream::SendAudioData

  1. void AudioSendStream::SendAudioData(std::unique_ptr<AudioFrame> audio_frame) {
  2. RTC_CHECK_RUNS_SERIALIZED(&audio_capture_race_checker_);
  3. RTC_DCHECK_GT(audio_frame->sample_rate_hz_, 0);
  4. double duration = static_cast<double>(audio_frame->samples_per_channel_) /
  5. audio_frame->sample_rate_hz_;
  6. {
  7. // Note: SendAudioData() passes the frame further down the pipeline and it
  8. // may eventually get sent. But this method is invoked even if we are not
  9. // connected, as long as we have an AudioSendStream (created as a result of
  10. // an O/A exchange). This means that we are calculating audio levels whether
  11. // or not we are sending samples.
  12. // TODO(https://crbug.com/webrtc/10771): All "media-source" related stats
  13. // should move from send-streams to the local audio sources or tracks; a
  14. // send-stream should not be required to read the microphone audio levels.
  15. MutexLock lock(&audio_level_lock_);
  16. audio_level_.ComputeLevel(*audio_frame, duration);
  17. }
  18. channel_send_->ProcessAndEncodeAudio(std::move(audio_frame));
  19. }

—》开始跟编码层打交道了, channelsend->ProcessAndEncodeAudio(std::move(audio_frame));

ChannelSend::ProcessAndEncodeAudio

  1. void ChannelSend::ProcessAndEncodeAudio(
  2. std::unique_ptr<AudioFrame> audio_frame) {
  3. RTC_DCHECK_RUNS_SERIALIZED(&audio_thread_race_checker_);
  4. RTC_DCHECK_GT(audio_frame->samples_per_channel_, 0);
  5. RTC_DCHECK_LE(audio_frame->num_channels_, 8);
  6. // Profile time between when the audio frame is added to the task queue and
  7. // when the task is actually executed.
  8. audio_frame->UpdateProfileTimeStamp();
  9. // 添加一个任务到encoder_queue_,然后触发编码线程来处理
  10. encoder_queue_.PostTask(
  11. [this, audio_frame = std::move(audio_frame)]() mutable {
  12. RTC_DCHECK_RUN_ON(&encoder_queue_);
  13. if (!encoder_queue_is_active_) {
  14. if (fixing_timestamp_stall_) {
  15. _timeStamp +=
  16. static_cast<uint32_t>(audio_frame->samples_per_channel_);
  17. }
  18. return;
  19. }
  20. // Measure time between when the audio frame is added to the task queue
  21. // and when the task is actually executed. Goal is to keep track of
  22. // unwanted extra latency added by the task queue.
  23. RTC_HISTOGRAM_COUNTS_10000("WebRTC.Audio.EncodingTaskQueueLatencyMs",
  24. audio_frame->ElapsedProfileTimeMs());
  25. // 根据输入mute属性,修改数据
  26. bool is_muted = InputMute();
  27. AudioFrameOperations::Mute(audio_frame.get(), previous_frame_muted_,
  28. is_muted);
  29. if (_includeAudioLevelIndication) {
  30. size_t length =
  31. audio_frame->samples_per_channel_ * audio_frame->num_channels_;
  32. RTC_CHECK_LE(length, AudioFrame::kMaxDataSizeBytes);
  33. if (is_muted && previous_frame_muted_) {
  34. rms_level_.AnalyzeMuted(length);
  35. } else {
  36. rms_level_.Analyze(
  37. rtc::ArrayView<const int16_t>(audio_frame->data(), length));
  38. }
  39. }
  40. previous_frame_muted_ = is_muted;
  41. // Add 10ms of raw (PCM) audio data to the encoder @ 32kHz.
  42. // The ACM resamples internally.
  43. audio_frame->timestamp_ = _timeStamp;
  44. // This call will trigger AudioPacketizationCallback::SendData if
  45. // encoding is done and payload is ready for packetization and
  46. // transmission. Otherwise, it will return without invoking the
  47. // callback.
  48. if (audio_coding_->Add10MsData(*audio_frame) < 0) {
  49. RTC_DLOG(LS_ERROR) << "ACM::Add10MsData() failed.";
  50. return;
  51. }
  52. _timeStamp += static_cast<uint32_t>(audio_frame->samples_per_channel_);
  53. });
  54. }

—》

AudioCodingModuleImpl::Add10MsData

  1. // Add 10MS of raw (PCM) audio data to the encoder.
  2. int AudioCodingModuleImpl::Add10MsData(const AudioFrame& audio_frame) {
  3. MutexLock lock(&acm_mutex_);
  4. int r = Add10MsDataInternal(audio_frame, &input_data_);
  5. // TODO(bugs.webrtc.org/10739): add dcheck that
  6. // |audio_frame.absolute_capture_timestamp_ms()| always has a value.
  7. return r < 0
  8. ? r
  9. : Encode(input_data_, audio_frame.absolute_capture_timestamp_ms());
  10. }

—》调用Encode对数据开始编码
AudioCodingModuleImpl::Encode

  1. int32_t AudioCodingModuleImpl::Encode(
  2. const InputData& input_data,
  3. absl::optional<int64_t> absolute_capture_timestamp_ms) {
  4. // TODO(bugs.webrtc.org/10739): add dcheck that
  5. // |audio_frame.absolute_capture_timestamp_ms()| always has a value.
  6. AudioEncoder::EncodedInfo encoded_info;
  7. uint8_t previous_pltype;
  8. // Check if there is an encoder before.
  9. if (!HaveValidEncoder("Process"))
  10. return -1;
  11. if (!first_frame_) {
  12. RTC_DCHECK(IsNewerTimestamp(input_data.input_timestamp, last_timestamp_))
  13. << "Time should not move backwards";
  14. }
  15. // Scale the timestamp to the codec's RTP timestamp rate.
  16. uint32_t rtp_timestamp =
  17. first_frame_
  18. ? input_data.input_timestamp
  19. : last_rtp_timestamp_ +
  20. rtc::dchecked_cast<uint32_t>(rtc::CheckedDivExact(
  21. int64_t{input_data.input_timestamp - last_timestamp_} *
  22. encoder_stack_->RtpTimestampRateHz(),
  23. int64_t{encoder_stack_->SampleRateHz()}));
  24. last_timestamp_ = input_data.input_timestamp;
  25. last_rtp_timestamp_ = rtp_timestamp;
  26. first_frame_ = false;
  27. // Clear the buffer before reuse - encoded data will get appended.
  28. encode_buffer_.Clear();
  29. encoded_info = encoder_stack_->Encode(
  30. rtp_timestamp,
  31. rtc::ArrayView<const int16_t>(
  32. input_data.audio,
  33. input_data.audio_channel * input_data.length_per_channel),
  34. &encode_buffer_);
  35. bitrate_logger_.MaybeLog(encoder_stack_->GetTargetBitrate() / 1000);
  36. // 采集是每10ms,但是编码是20ms,所以需要第二次才会开始编码
  37. if (encode_buffer_.size() == 0 && !encoded_info.send_even_if_empty) {
  38. // Not enough data.
  39. return 0; // 第一次进来会为0,第一次的数据会缓存到编码器中
  40. }
  41. previous_pltype = previous_pltype_; // Read it while we have the critsect.
  42. // Log codec type to histogram once every 500 packets.
  43. if (encoded_info.encoded_bytes == 0) {
  44. ++number_of_consecutive_empty_packets_;
  45. } else {
  46. size_t codec_type = static_cast<size_t>(encoded_info.encoder_type);
  47. codec_histogram_bins_log_[codec_type] +=
  48. number_of_consecutive_empty_packets_ + 1;
  49. number_of_consecutive_empty_packets_ = 0;
  50. if (codec_histogram_bins_log_[codec_type] >= 500) {
  51. codec_histogram_bins_log_[codec_type] -= 500;
  52. UpdateCodecTypeHistogram(codec_type);
  53. }
  54. }
  55. AudioFrameType frame_type;
  56. if (encode_buffer_.size() == 0 && encoded_info.send_even_if_empty) {
  57. frame_type = AudioFrameType::kEmptyFrame;
  58. encoded_info.payload_type = previous_pltype;
  59. } else {
  60. RTC_DCHECK_GT(encode_buffer_.size(), 0);
  61. frame_type = encoded_info.speech ? AudioFrameType::kAudioFrameSpeech
  62. : AudioFrameType::kAudioFrameCN;
  63. }
  64. {
  65. MutexLock lock(&callback_mutex_);
  66. if (packetization_callback_) { // 触发ChannelSend::SendData
  67. packetization_callback_->SendData(
  68. frame_type, encoded_info.payload_type, encoded_info.encoded_timestamp,
  69. encode_buffer_.data(), encode_buffer_.size(),
  70. absolute_capture_timestamp_ms.value_or(-1));
  71. }
  72. }
  73. previous_pltype_ = encoded_info.payload_type;
  74. return static_cast<int32_t>(encode_buffer_.size());
  75. }

—》

ChannelSend::SendData

  1. int32_t
  2. (AudioFrameType frameType,
  3. uint8_t payloadType,
  4. uint32_t rtp_timestamp,
  5. const uint8_t* payloadData,
  6. size_t payloadSize,
  7. int64_t absolute_capture_timestamp_ms) {
  8. RTC_DCHECK_RUN_ON(&encoder_queue_);
  9. rtc::ArrayView<const uint8_t> payload(payloadData, payloadSize);
  10. if (frame_transformer_delegate_) {
  11. // Asynchronously transform the payload before sending it. After the payload
  12. // is transformed, the delegate will call SendRtpAudio to send it.
  13. frame_transformer_delegate_->Transform(
  14. frameType, payloadType, rtp_timestamp, rtp_rtcp_->StartTimestamp(),
  15. payloadData, payloadSize, absolute_capture_timestamp_ms,
  16. rtp_rtcp_->SSRC());
  17. return 0;
  18. }
  19. return SendRtpAudio(frameType, payloadType, rtp_timestamp, payload,
  20. absolute_capture_timestamp_ms);
  21. }

—》

ChannelSend::SendRtpAudio

  1. int32_t ChannelSend::SendRtpAudio(AudioFrameType frameType,
  2. uint8_t payloadType,
  3. uint32_t rtp_timestamp,
  4. rtc::ArrayView<const uint8_t> payload,
  5. int64_t absolute_capture_timestamp_ms) {
  6. if (_includeAudioLevelIndication) {
  7. // Store current audio level in the RTP sender.
  8. // The level will be used in combination with voice-activity state
  9. // (frameType) to add an RTP header extension
  10. rtp_sender_audio_->SetAudioLevel(rms_level_.Average());
  11. }
  12. // E2EE Custom Audio Frame Encryption (This is optional).
  13. // Keep this buffer around for the lifetime of the send call.
  14. rtc::Buffer encrypted_audio_payload;
  15. // We don't invoke encryptor if payload is empty, which means we are to send
  16. // DTMF, or the encoder entered DTX.
  17. // TODO(minyue): see whether DTMF packets should be encrypted or not. In
  18. // current implementation, they are not.
  19. if (!payload.empty()) {
  20. if (frame_encryptor_ != nullptr) {
  21. // TODO(benwright@webrtc.org) - Allocate enough to always encrypt inline.
  22. // Allocate a buffer to hold the maximum possible encrypted payload.
  23. size_t max_ciphertext_size = frame_encryptor_->GetMaxCiphertextByteSize(
  24. cricket::MEDIA_TYPE_AUDIO, payload.size());
  25. encrypted_audio_payload.SetSize(max_ciphertext_size);
  26. // Encrypt the audio payload into the buffer.
  27. size_t bytes_written = 0;
  28. int encrypt_status = frame_encryptor_->Encrypt(
  29. cricket::MEDIA_TYPE_AUDIO, rtp_rtcp_->SSRC(),
  30. /*additional_data=*/nullptr, payload, encrypted_audio_payload,
  31. &bytes_written);
  32. if (encrypt_status != 0) {
  33. RTC_DLOG(LS_ERROR)
  34. << "Channel::SendData() failed encrypt audio payload: "
  35. << encrypt_status;
  36. return -1;
  37. }
  38. // Resize the buffer to the exact number of bytes actually used.
  39. encrypted_audio_payload.SetSize(bytes_written);
  40. // Rewrite the payloadData and size to the new encrypted payload.
  41. payload = encrypted_audio_payload;
  42. } else if (crypto_options_.sframe.require_frame_encryption) {
  43. RTC_DLOG(LS_ERROR) << "Channel::SendData() failed sending audio payload: "
  44. "A frame encryptor is required but one is not set.";
  45. return -1;
  46. }
  47. }
  48. // Push data from ACM to RTP/RTCP-module to deliver audio frame for
  49. // packetization.
  50. if (!rtp_rtcp_->OnSendingRtpFrame(rtp_timestamp,
  51. // Leaving the time when this frame was
  52. // received from the capture device as
  53. // undefined for voice for now.
  54. -1, payloadType,
  55. /*force_sender_report=*/false)) {
  56. return -1;
  57. }
  58. // RTCPSender has it's own copy of the timestamp offset, added in
  59. // RTCPSender::BuildSR, hence we must not add the in the offset for the above
  60. // call.
  61. // TODO(nisse): Delete RTCPSender:timestamp_offset_, and see if we can confine
  62. // knowledge of the offset to a single place.
  63. // This call will trigger Transport::SendPacket() from the RTP/RTCP module.
  64. if (!rtp_sender_audio_->SendAudio(
  65. frameType, payloadType, rtp_timestamp + rtp_rtcp_->StartTimestamp(),
  66. payload.data(), payload.size(), absolute_capture_timestamp_ms)) {
  67. RTC_DLOG(LS_ERROR)
  68. << "ChannelSend::SendData() failed to send data to RTP/RTCP module";
  69. return -1;
  70. }
  71. return 0;
  72. }

—》

RTPSenderAudio::SendAudio

  1. bool RTPSenderAudio::SendAudio(AudioFrameType frame_type,
  2. int8_t payload_type,
  3. uint32_t rtp_timestamp,
  4. const uint8_t* payload_data,
  5. size_t payload_size,
  6. int64_t absolute_capture_timestamp_ms) {
  7. #if RTC_TRACE_EVENTS_ENABLED
  8. TRACE_EVENT_ASYNC_STEP1("webrtc", "Audio", rtp_timestamp, "Send", "type",
  9. FrameTypeToString(frame_type));
  10. #endif
  11. *****
  12. // 创建一个rtp包,并且填入数据
  13. std::unique_ptr<RtpPacketToSend> packet = rtp_sender_->AllocatePacket();
  14. packet->SetMarker(MarkerBit(frame_type, payload_type));
  15. packet->SetPayloadType(payload_type);
  16. packet->SetTimestamp(rtp_timestamp);
  17. packet->set_capture_time_ms(clock_->TimeInMilliseconds());
  18. // Update audio level extension, if included.
  19. packet->SetExtension<AudioLevel>(
  20. frame_type == AudioFrameType::kAudioFrameSpeech, audio_level_dbov);
  21. // Send absolute capture time periodically in order to optimize and save
  22. // network traffic. Missing absolute capture times can be interpolated on the
  23. // receiving end if sending intervals are small enough.
  24. auto absolute_capture_time = absolute_capture_time_sender_.OnSendPacket(
  25. AbsoluteCaptureTimeSender::GetSource(packet->Ssrc(), packet->Csrcs()),
  26. packet->Timestamp(),
  27. // Replace missing value with 0 (invalid frequency), this will trigger
  28. // absolute capture time sending.
  29. encoder_rtp_timestamp_frequency.value_or(0),
  30. Int64MsToUQ32x32(absolute_capture_timestamp_ms + NtpOffsetMs()),
  31. /*estimated_capture_clock_offset=*/
  32. include_capture_clock_offset_ ? absl::make_optional(0) : absl::nullopt);
  33. if (absolute_capture_time) {
  34. // It also checks that extension was registered during SDP negotiation. If
  35. // not then setter won't do anything.
  36. packet->SetExtension<AbsoluteCaptureTimeExtension>(*absolute_capture_time);
  37. }
  38. uint8_t* payload = packet->AllocatePayload(payload_size);
  39. if (!payload) // Too large payload buffer.
  40. return false;
  41. memcpy(payload, payload_data, payload_size);
  42. if (!rtp_sender_->AssignSequenceNumber(packet.get()))
  43. return false;
  44. {
  45. MutexLock lock(&send_audio_mutex_);
  46. last_payload_type_ = payload_type;
  47. }
  48. TRACE_EVENT_ASYNC_END2("webrtc", "Audio", rtp_timestamp, "timestamp",
  49. packet->Timestamp(), "seqnum",
  50. packet->SequenceNumber());
  51. packet->set_packet_type(RtpPacketMediaType::kAudio);
  52. packet->set_allow_retransmission(true);
  53. bool send_result = rtp_sender_->SendToNetwork(std::move(packet));
  54. if (first_packet_sent_()) {
  55. RTC_LOG(LS_INFO) << "First audio RTP packet sent to pacer";
  56. }
  57. return send_result;
  58. }

—》

RTPSender::SendToNetwork

  1. bool RTPSender::SendToNetwork(std::unique_ptr<RtpPacketToSend> packet) {
  2. RTC_DCHECK(packet);
  3. int64_t now_ms = clock_->TimeInMilliseconds();
  4. auto packet_type = packet->packet_type();
  5. RTC_CHECK(packet_type) << "Packet type must be set before sending.";
  6. if (packet->capture_time_ms() <= 0) {
  7. packet->set_capture_time_ms(now_ms);
  8. }
  9. std::vector<std::unique_ptr<RtpPacketToSend>> packets;
  10. packets.emplace_back(std::move(packet));
  11. // 将packets插入到pace的队列中
  12. paced_sender_->EnqueuePackets(std::move(packets));
  13. return true;
  14. }

—>

RtpPacketSenderProxy::EnqueuePackets

  1. class RtpPacketSenderProxy : public RtpPacketSender {
  2. public:
  3. void EnqueuePackets(
  4. std::vector<std::unique_ptr<RtpPacketToSend>> packets) override {
  5. MutexLock lock(&mutex_);
  6. rtp_packet_pacer_->EnqueuePackets(std::move(packets));
  7. }
  8. ***

—>

PacedSender::EnqueuePackets

  1. void PacedSender::EnqueuePackets(
  2. std::vector<std::unique_ptr<RtpPacketToSend>> packets) {
  3. {
  4. TRACE_EVENT0(TRACE_DISABLED_BY_DEFAULT("webrtc"),
  5. "PacedSender::EnqueuePackets");
  6. MutexLock lock(&mutex_);
  7. for (auto& packet : packets) {
  8. TRACE_EVENT2(TRACE_DISABLED_BY_DEFAULT("webrtc"),
  9. "PacedSender::EnqueuePackets::Loop", "sequence_number",
  10. packet->SequenceNumber(), "rtp_timestamp",
  11. packet->Timestamp());
  12. RTC_DCHECK_GE(packet->capture_time_ms(), 0);
  13. pacing_controller_.EnqueuePacket(std::move(packet));
  14. }
  15. }
  16. MaybeWakupProcessThread();
  17. }

—》

PacingController::EnqueuePacket

H:\webrtc-20210315\webrtc-20210315\webrtc\webrtc-checkout\src\modules\pacing\pacing_controller.cc

  1. void PacingController::EnqueuePacket(std::unique_ptr<RtpPacketToSend> packet) {
  2. RTC_DCHECK(pacing_bitrate_ > DataRate::Zero())
  3. << "SetPacingRate must be called before InsertPacket.";
  4. RTC_CHECK(packet->packet_type());
  5. // Get priority first and store in temporary, to avoid chance of object being
  6. // moved before GetPriorityForType() being called.
  7. const int priority = GetPriorityForType(*packet->packet_type());
  8. EnqueuePacketInternal(std::move(packet), priority);
  9. }

image.png
—》

PacingController::EnqueuePacketInternal

  1. void PacingController::EnqueuePacketInternal(
  2. std::unique_ptr<RtpPacketToSend> packet,
  3. int priority) {
  4. prober_.OnIncomingPacket(DataSize::Bytes(packet->payload_size()));
  5. Timestamp now = CurrentTime();
  6. if (mode_ == ProcessMode::kDynamic && packet_queue_.Empty() &&
  7. NextSendTime() <= now) {
  8. TimeDelta elapsed_time = UpdateTimeAndGetElapsed(now);
  9. UpdateBudgetWithElapsedTime(elapsed_time);
  10. }
  11. packet_queue_.Push(priority, now, packet_counter_++, std::move(packet));
  12. }

最后是通过PacingController::GetPendingPacket来获取添加的packet数据然后发出去。

PacingController::GetPendingPacket

  1. std::unique_ptr<RtpPacketToSend> PacingController::GetPendingPacket(
  2. const PacedPacketInfo& pacing_info,
  3. Timestamp target_send_time,
  4. Timestamp now) {
  5. if (packet_queue_.Empty()) {
  6. return nullptr;
  7. }
  8. // First, check if there is any reason _not_ to send the next queued packet.
  9. // Unpaced audio packets and probes are exempted from send checks.
  10. bool unpaced_audio_packet =
  11. !pace_audio_ && packet_queue_.LeadingAudioPacketEnqueueTime().has_value();
  12. bool is_probe = pacing_info.probe_cluster_id != PacedPacketInfo::kNotAProbe;
  13. if (!unpaced_audio_packet && !is_probe) {
  14. if (Congested()) {
  15. // Don't send anything if congested.
  16. return nullptr;
  17. }
  18. if (mode_ == ProcessMode::kPeriodic) {
  19. if (media_budget_.bytes_remaining() <= 0) {
  20. // Not enough budget.
  21. return nullptr;
  22. }
  23. } else {
  24. // Dynamic processing mode.
  25. if (now <= target_send_time) {
  26. // We allow sending slightly early if we think that we would actually
  27. // had been able to, had we been right on time - i.e. the current debt
  28. // is not more than would be reduced to zero at the target sent time.
  29. TimeDelta flush_time = media_debt_ / media_rate_;
  30. if (now + flush_time > target_send_time) {
  31. return nullptr;
  32. }
  33. }
  34. }
  35. }
  36. return packet_queue_.Pop();
  37. }

image.png
可以看到是PacingController::ProcessPackets中调用了获取PacingController::GetPendingPacket数据。

PacingController::ProcessPackets

  1. void PacingController::ProcessPackets() {
  2. Timestamp now = CurrentTime();
  3. Timestamp target_send_time = now;
  4. if (mode_ == ProcessMode::kDynamic) {
  5. target_send_time = NextSendTime();
  6. TimeDelta early_execute_margin =
  7. prober_.is_probing() ? kMaxEarlyProbeProcessing : TimeDelta::Zero();
  8. if (target_send_time.IsMinusInfinity()) {
  9. target_send_time = now;
  10. } else if (now < target_send_time - early_execute_margin) {
  11. // We are too early, but if queue is empty still allow draining some debt.
  12. // Probing is allowed to be sent up to kMinSleepTime early.
  13. TimeDelta elapsed_time = UpdateTimeAndGetElapsed(now);
  14. UpdateBudgetWithElapsedTime(elapsed_time);
  15. return;
  16. }
  17. if (target_send_time < last_process_time_) {
  18. // After the last process call, at time X, the target send time
  19. // shifted to be earlier than X. This should normally not happen
  20. // but we want to make sure rounding errors or erratic behavior
  21. // of NextSendTime() does not cause issue. In particular, if the
  22. // buffer reduction of
  23. // rate * (target_send_time - previous_process_time)
  24. // in the main loop doesn't clean up the existing debt we may not
  25. // be able to send again. We don't want to check this reordering
  26. // there as it is the normal exit condtion when the buffer is
  27. // exhausted and there are packets in the queue.
  28. UpdateBudgetWithElapsedTime(last_process_time_ - target_send_time);
  29. target_send_time = last_process_time_;
  30. }
  31. }
  32. Timestamp previous_process_time = last_process_time_;
  33. TimeDelta elapsed_time = UpdateTimeAndGetElapsed(now);
  34. if (ShouldSendKeepalive(now)) {
  35. // We can not send padding unless a normal packet has first been sent. If
  36. // we do, timestamps get messed up.
  37. if (packet_counter_ == 0) {
  38. last_send_time_ = now;
  39. } else {
  40. DataSize keepalive_data_sent = DataSize::Zero();
  41. std::vector<std::unique_ptr<RtpPacketToSend>> keepalive_packets =
  42. packet_sender_->GeneratePadding(DataSize::Bytes(1));
  43. for (auto& packet : keepalive_packets) {
  44. keepalive_data_sent +=
  45. DataSize::Bytes(packet->payload_size() + packet->padding_size());
  46. packet_sender_->SendPacket(std::move(packet), PacedPacketInfo());
  47. for (auto& packet : packet_sender_->FetchFec()) {
  48. EnqueuePacket(std::move(packet));
  49. }
  50. }
  51. OnPaddingSent(keepalive_data_sent);
  52. }
  53. }
  54. if (paused_) {
  55. return;
  56. }
  57. if (elapsed_time > TimeDelta::Zero()) {
  58. DataRate target_rate = pacing_bitrate_;
  59. DataSize queue_size_data = packet_queue_.Size();
  60. if (queue_size_data > DataSize::Zero()) {
  61. // Assuming equal size packets and input/output rate, the average packet
  62. // has avg_time_left_ms left to get queue_size_bytes out of the queue, if
  63. // time constraint shall be met. Determine bitrate needed for that.
  64. packet_queue_.UpdateQueueTime(now);
  65. if (drain_large_queues_) {
  66. TimeDelta avg_time_left =
  67. std::max(TimeDelta::Millis(1),
  68. queue_time_limit - packet_queue_.AverageQueueTime());
  69. DataRate min_rate_needed = queue_size_data / avg_time_left;
  70. if (min_rate_needed > target_rate) {
  71. target_rate = min_rate_needed;
  72. RTC_LOG(LS_VERBOSE) << "bwe:large_pacing_queue pacing_rate_kbps="
  73. << target_rate.kbps();
  74. }
  75. }
  76. }
  77. if (mode_ == ProcessMode::kPeriodic) {
  78. // In periodic processing mode, the IntevalBudget allows positive budget
  79. // up to (process interval duration) * (target rate), so we only need to
  80. // update it once before the packet sending loop.
  81. media_budget_.set_target_rate_kbps(target_rate.kbps());
  82. UpdateBudgetWithElapsedTime(elapsed_time);
  83. } else {
  84. media_rate_ = target_rate;
  85. }
  86. }
  87. bool first_packet_in_probe = false;
  88. PacedPacketInfo pacing_info;
  89. DataSize recommended_probe_size = DataSize::Zero();
  90. bool is_probing = prober_.is_probing();
  91. if (is_probing) {
  92. // Probe timing is sensitive, and handled explicitly by BitrateProber, so
  93. // use actual send time rather than target.
  94. pacing_info = prober_.CurrentCluster(now).value_or(PacedPacketInfo());
  95. if (pacing_info.probe_cluster_id != PacedPacketInfo::kNotAProbe) {
  96. first_packet_in_probe = pacing_info.probe_cluster_bytes_sent == 0;
  97. recommended_probe_size = prober_.RecommendedMinProbeSize();
  98. RTC_DCHECK_GT(recommended_probe_size, DataSize::Zero());
  99. } else {
  100. // No valid probe cluster returned, probe might have timed out.
  101. is_probing = false;
  102. }
  103. }
  104. DataSize data_sent = DataSize::Zero();
  105. // The paused state is checked in the loop since it leaves the critical
  106. // section allowing the paused state to be changed from other code.
  107. while (!paused_) {
  108. if (first_packet_in_probe) {
  109. // If first packet in probe, insert a small padding packet so we have a
  110. // more reliable start window for the rate estimation.
  111. auto padding = packet_sender_->GeneratePadding(DataSize::Bytes(1));
  112. // If no RTP modules sending media are registered, we may not get a
  113. // padding packet back.
  114. if (!padding.empty()) {
  115. // Insert with high priority so larger media packets don't preempt it.
  116. EnqueuePacketInternal(std::move(padding[0]), kFirstPriority);
  117. // We should never get more than one padding packets with a requested
  118. // size of 1 byte.
  119. RTC_DCHECK_EQ(padding.size(), 1u);
  120. }
  121. first_packet_in_probe = false;
  122. }
  123. if (mode_ == ProcessMode::kDynamic &&
  124. previous_process_time < target_send_time) {
  125. // Reduce buffer levels with amount corresponding to time between last
  126. // process and target send time for the next packet.
  127. // If the process call is late, that may be the time between the optimal
  128. // send times for two packets we should already have sent.
  129. UpdateBudgetWithElapsedTime(target_send_time - previous_process_time);
  130. previous_process_time = target_send_time;
  131. }
  132. // Fetch the next packet, so long as queue is not empty or budget is not
  133. // exhausted.
  134. std::unique_ptr<RtpPacketToSend> rtp_packet =
  135. GetPendingPacket(pacing_info, target_send_time, now);
  136. if (rtp_packet == nullptr) {
  137. // No packet available to send, check if we should send padding.
  138. DataSize padding_to_add = PaddingToAdd(recommended_probe_size, data_sent);
  139. if (padding_to_add > DataSize::Zero()) {
  140. std::vector<std::unique_ptr<RtpPacketToSend>> padding_packets =
  141. packet_sender_->GeneratePadding(padding_to_add);
  142. if (padding_packets.empty()) {
  143. // No padding packets were generated, quite send loop.
  144. break;
  145. }
  146. for (auto& packet : padding_packets) {
  147. EnqueuePacket(std::move(packet));
  148. }
  149. // Continue loop to send the padding that was just added.
  150. continue;
  151. }
  152. // Can't fetch new packet and no padding to send, exit send loop.
  153. break;
  154. }
  155. RTC_DCHECK(rtp_packet);
  156. RTC_DCHECK(rtp_packet->packet_type().has_value());
  157. const RtpPacketMediaType packet_type = *rtp_packet->packet_type();
  158. DataSize packet_size = DataSize::Bytes(rtp_packet->payload_size() +
  159. rtp_packet->padding_size());
  160. if (include_overhead_) {
  161. packet_size += DataSize::Bytes(rtp_packet->headers_size()) +
  162. transport_overhead_per_packet_;
  163. }
  164. packet_sender_->SendPacket(std::move(rtp_packet), pacing_info);
  165. for (auto& packet : packet_sender_->FetchFec()) {
  166. EnqueuePacket(std::move(packet));
  167. }
  168. data_sent += packet_size;
  169. // Send done, update send/process time to the target send time.
  170. OnPacketSent(packet_type, packet_size, target_send_time);
  171. // If we are currently probing, we need to stop the send loop when we have
  172. // reached the send target.
  173. if (is_probing && data_sent >= recommended_probe_size) {
  174. break;
  175. }
  176. if (mode_ == ProcessMode::kDynamic) {
  177. // Update target send time in case that are more packets that we are late
  178. // in processing.
  179. Timestamp next_send_time = NextSendTime();
  180. if (next_send_time.IsMinusInfinity()) {
  181. target_send_time = now;
  182. } else {
  183. target_send_time = std::min(now, next_send_time);
  184. }
  185. }
  186. }
  187. last_process_time_ = std::max(last_process_time_, previous_process_time);
  188. if (is_probing) {
  189. probing_send_failure_ = data_sent == DataSize::Zero();
  190. if (!probing_send_failure_) {
  191. prober_.ProbeSent(CurrentTime(), data_sent);
  192. }
  193. }
  194. }

里面是调用了packetsender->SendPacket(std::move(packet), PacedPacketInfo());来发送数据。
image.png