前言

  • merge lab3的初始代码,出现如下变化

image.png

通读文档后一些疑问

  • 谁来调用fill_window()?
  • ack_received()需要调用fill_window()吗?
    • ack_received会更新接收方的接收窗口
  • 放入_segments_out的TCPSegment的ackno怎么填?
    • send_empty_segment()的TCPSegment的ackno怎么填?
    • 我觉得都不需要填,由TCPConnection来填,只需要填入seqno
  • 接收窗口为0的逻辑在哪里处理?怎么处理?

定时器相关疑问

  • 定时器第一次启动是什么时候?
    • 每次发送序号空间不为0的段时,就会检查定时器是否启动,如果没有启动就启动
    • 所以第一次启动,是发送SYN报文段后
  • 定时器什么时候会被启动/重新启动
    • 每次发送序号空间不为0的段时,就会检查定时器是否启动,如果没有启动就启动
    • ack_received()被调用后,接收到1个之前从来没有出现过的更大的ackno,且_segments_out未被清空,则启动或重启
      • 我觉得这里应该一定有定时器是开着的,因为_segments_out未被清空,所以一定是重启
      • 注意RTO可能被更新(变回初始值)
    • 定时器超时后,要再次启动
      • 注意RTO可能被更新
  • 定时器怎么确认启动时,当前时刻是多少?定时器什么时候超时?定时器怎么确认超时?
    • 定义一个成员变量,表示毫秒数,每次tick()被调用,更新这个毫秒数
    • 是否可以认为tick()被调用的足够频繁,以至于定时器认为此时的毫秒数变量指示的就是真实的毫秒数?
    • tick()被调用,在tick()方法中发现当前的毫秒数已经超过了定时器设置的超时毫秒数,则定时器超时
    • 定义一个成员变量_timer_ms
    • 当定时器被启动或重启时,_timer_ms置为0
    • 每次tick被调用
      • _timer_ms+=ms_since_last_tick
      • 若_timer_ms>=RTO,则定时器超时
  • 定时器什么时候会被关闭?

    • ack_received()被调用后,_segments_out被清空时,如果当前定时器开着,就关闭
  • 连续重传次数什么时候+1?连续重传次数什么时候清零?TCPSender除了对连续重传次数+1和清零,还需要做别的事情吗?

    • 如果tick()被调用,发现定时器超时,则会重传最早未被确认的报文段,此时连续重传次数+1(文档里说if window size is nonzero???)
    • ack_received()被调用,接收到1个之前从来没有出现过的更大的ackno,则连续重传次数清零
  • RTO什么时候加倍?RTO什么时候恢复初始值?
    • 如果tick()被调用,发现定时器超时,则会重传最早未被确认的报文段,此时RTO加倍(文档里说if window size is nonzero???)
    • ack_received()被调用,接收到1个之前从来没有出现过的更大的ackno,则RTO变回初始值

需要额外设置哪些成员变量?

  • _initial_retransmission_timeout存储的是RTO的初始值,我们还需要存储RTO,_retransmission_timeout
  • 最新的ackno和window_size
  • 注意_segments_out表示的是所有已经封装好,马上要被发送的报文段队列,我们还需要一个额外的数据结构,来保存那些已发送但未被确认的报文段,前者是后者的子集
    • std::queue来保存
  • 我们经常需要调用bytes_in_flight()来计算那些已发送但未被确认的字节数,由于std::queue不能很方便的遍历,即使要遍历的话,也很浪费时间
    • 所以直接设置一个变量_bytes_in_flight,来动态的记录这个队列中的TCP报文段的字节数之和,以提高时间效率
  • 连续重传次数,_consecutive_retransmission
  • 定时器相关
    • 计时器启动后已经经历的毫秒数,_timer_ms
    • 定时器是否启动,_timer_running

第一阶段

分析

  • 大部分的疑问和思考见前言
  • 本阶段的代码中,完全没有考虑接收方窗口大小为0的情况

    代码

    tcp_sender.hh ```cpp

    ifndef SPONGE_LIBSPONGE_TCP_SENDER_HH

    define SPONGE_LIBSPONGE_TCP_SENDER_HH

include “byte_stream.hh”

include “tcp_config.hh”

include “tcp_segment.hh”

include “wrapping_integers.hh”

include

include

//! \brief The “sender” part of a TCP implementation.

//! Accepts a ByteStream, divides it up into segments and sends the //! segments, keeps track of which segments are still in-flight, //! maintains the Retransmission Timer, and retransmits in-flight //! segments if the retransmission timer expires. class TCPSender { private: // 发送方随机选择的ISN //! our initial sequence number, the number for our SYN. WrappingInt32 _isn;

  1. // 把需要发送的报文段放到这个队列
  2. //! outbound queue of segments that the TCPSender wants sent
  3. std::queue<TCPSegment> _segments_out{};
  4. // 这个变量保存的是RTO的初始值
  5. //! retransmission timer for the connection
  6. unsigned int _initial_retransmission_timeout;
  7. // 还没有发送的数据
  8. //! outgoing stream of bytes that have not yet been sent
  9. ByteStream _stream;
  10. // 未发送数据的第一个字节的绝对序号,包括SYN和FIN
  11. //! the (absolute) sequence number for the next byte to be sent
  12. uint64_t _next_seqno{0};
  13. size_t _lately_ackno = 0; // 从接收方返回的最大的ackno
  14. size_t _window_size = 0; // 和_lately_ackno配套的window size
  15. size_t _bytes_in_flight = 0; // 是_segments_outstanding中的序号空间大小
  16. std::queue<TCPSegment> _segments_outstanding{}; // 已发送但未确认的报文段
  17. size_t _retransmission_timeout = 0; // RTO
  18. size_t _consecutive_retransmission = 0; // 连续重传次数
  19. size_t _timer_ms = 0; // 计时器启动后经过的毫秒数
  20. bool _timer_running = false; // 计时器是否启动
  21. void send_segment(TCPSegment &seg); // 发送已经填写了必要信息的报文段,这个方法内部只会再向TCPSegment中填入seqno

public: //! Initialize a TCPSender // 构造函数的参数:容量、初始超时时间(RTO的初始值)、ISN TCPSender(const size_t capacity = TCPConfig::DEFAULT_CAPACITY, const uint16_t retx_timeout = TCPConfig::TIMEOUT_DFLT, const std::optional fixed_isn = {});

  1. //! \name "Input" interface for the writer
  2. //!@{
  3. ByteStream &stream_in() { return _stream; }
  4. const ByteStream &stream_in() const { return _stream; }
  5. //!@}
  6. //! \name Methods that can cause the TCPSender to send a segment
  7. //!@{
  8. //! \brief A new acknowledgment was received
  9. void ack_received(const WrappingInt32 ackno, const uint16_t window_size);
  10. //! \brief Generate an empty-payload segment (useful for creating empty ACK segments)
  11. void send_empty_segment();
  12. //! \brief create and send segments to fill as much of the window as possible
  13. void fill_window();
  14. // 每隔几毫秒,您的TCPSender的tick方法将被调用,并带有一个参数,该参数指示自上次调用该方法以来已经过了几毫秒
  15. //! \brief Notifies the TCPSender of the passage of time
  16. void tick(const size_t ms_since_last_tick);
  17. //!@}
  18. //! \name Accessors
  19. //!@{
  20. // 已发送但未确认的字节数,SYN和FIN各占1字节
  21. //! \brief How many sequence numbers are occupied by segments sent but not yet acknowledged?
  22. //! \note count is in "sequence space," i.e. SYN and FIN each count for one byte
  23. //! (see TCPSegment::length_in_sequence_space())
  24. size_t bytes_in_flight() const;
  25. // 连续重传次数
  26. //! \brief Number of consecutive retransmissions that have occurred in a row
  27. unsigned int consecutive_retransmissions() const;
  28. //! \brief TCPSegments that the TCPSender has enqueued for transmission.
  29. //! \note These must be dequeued and sent by the TCPConnection,
  30. //! which will need to fill in the fields that are set by the TCPReceiver
  31. //! (ackno and window size) before sending.
  32. std::queue<TCPSegment> &segments_out() { return _segments_out; }
  33. //!@}
  34. //! \name What is the next sequence number? (used for testing)
  35. //!@{
  36. //! \brief absolute seqno for the next byte to be sent
  37. uint64_t next_seqno_absolute() const { return _next_seqno; }
  38. //! \brief relative seqno for the next byte to be sent
  39. WrappingInt32 next_seqno() const { return wrap(_next_seqno, _isn); }
  40. //!@}

};

endif // SPONGE_LIBSPONGE_TCP_SENDER_HH

  1. tcp_sender.cc
  2. ```cpp
  3. #include "tcp_sender.hh"
  4. #include "tcp_config.hh"
  5. #include <random>
  6. // Dummy implementation of a TCP sender
  7. // For Lab 3, please replace with a real implementation that passes the
  8. // automated checks run by `make check_lab3`.
  9. template <typename... Targs>
  10. void DUMMY_CODE(Targs &&... /* unused */) {}
  11. using namespace std;
  12. //! \param[in] capacity the capacity of the outgoing byte stream
  13. //! \param[in] retx_timeout the initial amount of time to wait before retransmitting the oldest outstanding segment
  14. //! \param[in] fixed_isn the Initial Sequence Number to use, if set (otherwise uses a random ISN)
  15. TCPSender::TCPSender(const size_t capacity, const uint16_t retx_timeout, const std::optional<WrappingInt32> fixed_isn)
  16. : _isn(fixed_isn.value_or(WrappingInt32{random_device()()}))
  17. , _initial_retransmission_timeout{retx_timeout}
  18. , _stream(capacity)
  19. ,_lately_ackno(0), _window_size(0)
  20. ,_bytes_in_flight(0), _segments_outstanding()
  21. ,_retransmission_timeout{retx_timeout}, _consecutive_retransmission(0)
  22. ,_timer_ms(0), _timer_running(false) {}
  23. // _segments_outstanding中所有报文段的和,包括SYN和FIN
  24. uint64_t TCPSender::bytes_in_flight() const {
  25. return _bytes_in_flight;
  26. }
  27. void TCPSender::fill_window() {
  28. if (_next_seqno==0) { // 状态0 CLOSED
  29. // 发送不带数据的SYN报文
  30. TCPSegment seg;
  31. seg.header().syn=true;
  32. send_segment(seg);
  33. } else if (_next_seqno>0&&bytes_in_flight()==_next_seqno) { // 状态1 SYN_SENT
  34. // 在未收到任何ACK之前,假设接收方的窗口大小为1,所以这里不能再发送任何数据
  35. } else if (!_stream.eof()&&bytes_in_flight()<_next_seqno) { // 状态2 SYN_ACKED
  36. /* 考察区间[_next_seqno, _lately_ackno+_window_size)
  37. 如果区间不存在,则不能发送新的数据
  38. 如果window_size==0,则可以发送1个字节的数据
  39. */
  40. if (_lately_ackno+_window_size>_next_seqno) {
  41. // 有可以发送的数据
  42. std::string data_to_send = _stream.read(_lately_ackno+_window_size-_next_seqno);
  43. TCPSegment seg;
  44. seg.payload() = Buffer(std::move(data_to_send));
  45. // 如果stream读完了,且还有余下的空间放置FIN
  46. if (_stream.eof()&&data_to_send.size()<_lately_ackno+_window_size-_next_seqno) {
  47. seg.header().fin=true;
  48. }
  49. send_segment(seg);
  50. }
  51. } else if (_stream.eof()&&_next_seqno<_stream.bytes_written()+2) { // 状态3 SYN_ACKED and FIN not Sent
  52. // 这种状态下一定只剩下1个FIN没放进TCPSegment的队列
  53. // 接收窗口也可能是0吧?
  54. if (_lately_ackno+_window_size>_next_seqno) {
  55. TCPSegment seg;
  56. seg.header().fin=true;
  57. send_segment(seg);
  58. }
  59. } else if (_stream.eof()&&_next_seqno==_stream.bytes_written()+2&&bytes_in_flight()>0) { // 状态4 FIN_SENT
  60. // 这里不用发送数据
  61. } else if (_stream.eof()&&_next_seqno==_stream.bytes_written()+2&&bytes_in_flight()==0) { // 状态5 FIN_ACKED
  62. // 这里不用发送数据
  63. } else { // 应该不会到这个分支
  64. }
  65. }
  66. //! \param ackno The remote receiver's ackno (acknowledgment number)
  67. //! \param window_size The remote receiver's advertised window size
  68. void TCPSender::ack_received(const WrappingInt32 ackno, const uint16_t window_size) {
  69. size_t _ackno=unwrap(ackno, _isn, _lately_ackno);
  70. if (_ackno>=_lately_ackno) {
  71. // if (_ackno>_lately_ackno) { // 这样的话,部分确认也会重置RTO和清零连续重传次数
  72. // _consecutive_retransmission=0;
  73. // _retransmission_timeout=_initial_retransmission_timeout;
  74. // }
  75. _lately_ackno=_ackno;
  76. _window_size=window_size;
  77. }
  78. bool seg_acked=false;
  79. while (_segments_outstanding.size()) {
  80. TCPSegment seg = _segments_outstanding.front();
  81. size_t abs_seqno = unwrap(seg.header().seqno, _isn, _lately_ackno);
  82. if (abs_seqno+seg.length_in_sequence_space()<=_lately_ackno) {
  83. seg_acked=true;
  84. _segments_outstanding.pop();
  85. } else {
  86. break;
  87. }
  88. }
  89. // 我认为这里_timer_running一定是true
  90. if (_segments_outstanding.empty()) {
  91. _timer_running=false; // 关闭定时器
  92. } else if (seg_acked) { // 重启定时器
  93. _timer_running=true;
  94. _timer_ms=0;
  95. }
  96. // 这样的话,部分确认不会重置RTO和清零连续重传次数吗?
  97. if (seg_acked) {
  98. _consecutive_retransmission=0;
  99. _retransmission_timeout=_initial_retransmission_timeout;
  100. }
  101. // 如果接收窗口有了新的空间,TCPSender应该再次填充该窗口。
  102. // todo
  103. }
  104. //! \param[in] ms_since_last_tick the number of milliseconds since the last call to this method
  105. void TCPSender::tick(const size_t ms_since_last_tick) {
  106. // 目前没有考虑窗口大小为0的情况
  107. if (_timer_running) {
  108. _timer_ms+=ms_since_last_tick;
  109. if (_timer_ms>=_retransmission_timeout) { // 定时器超时,需要重传最早的未被确认的报文段
  110. _timer_ms=0; // 重启定时器
  111. _consecutive_retransmission++; // 重传次数加一
  112. _retransmission_timeout<<=1; // RTO加倍
  113. if (_segments_outstanding.size()) { // 这个判断应该一定是成立的吧?
  114. _segments_out.push(_segments_outstanding.front());
  115. }
  116. }
  117. }
  118. }
  119. unsigned int TCPSender::consecutive_retransmissions() const {
  120. return _consecutive_retransmission;
  121. }
  122. // 不带SYN、FIN,且不包含数据,只需要设置正确的seqno
  123. void TCPSender::send_empty_segment() {
  124. TCPSegment seg;
  125. seg.header().seqno=wrap(_next_seqno, _isn);
  126. _segments_out.push(seg);
  127. // 不用加入_segments_outstanding,也不用启动定时器
  128. }
  129. void TCPSender::send_segment(TCPSegment &seg) {
  130. seg.header().seqno=wrap(_next_seqno, _isn);
  131. _next_seqno+=seg.length_in_sequence_space();
  132. _bytes_in_flight+=seg.length_in_sequence_space();
  133. _segments_out.push(seg);
  134. _segments_outstanding.push(seg);
  135. // 每次发送报文段都要检查定时器,如果未启动则启动
  136. if (!_timer_running) {
  137. _timer_ms=0;
  138. _timer_running=true;
  139. }
  140. }

测试结果

image.png

When filling window, treat a ‘0’ window size as equal to ‘1’ but don’t back off RTO

Retx SYN twice at the right times, then ack

DEBUG

  • 问题一:在收到新的ack后,从_segments_outstanding中删除已经被完全确认的报文段,此时_bytes_in_flight这个变量应该更新,我忘记了
  • 问题二:接收窗口很大,且待发送的数据很多,这种情况下,调用fill_window()一次,就会产生多个TCPSegment,我实现fill_window每次只会产生1个TCPSegment
  • 问题三:在收到新的ack后,需要手动调用fill_window(),上面的代码中没有加
    • 目前,我把这个调用和RTO的初始化、连续重传次数的清零放在了一起
    • 只有在至少完全确认一个报文段后,才会调用fill_window(),如果只是部分确认,就不会调用fill_window()
    • 其它可能的写法
      • 无论是否收到新的ack,在ack_received中一定会调用fill_window()
      • 收到新的ack,部分确认了最早已发送但未确认的报文段,那么就调用fill_window()
  • 问题四:接收窗口为0的情况未处理
    • 我一开始接受窗口的初始化值为0(在接收任何接收方返回的报文段之前,应假设为1),现在修改为了1
    • 在收到新的ack,更新了_window_size=0后
      • 超时后不再增加重传次数,也不会RTO加倍
      • 在fill_window()方法中,如果读到_window_size=0,应将其当做1,所以设置了一个叫window_size的局部变量

第二阶段

代码

在修改了第一阶段的问题后,代码如下

tcp_sender.hh(无改动)

  1. #ifndef SPONGE_LIBSPONGE_TCP_SENDER_HH
  2. #define SPONGE_LIBSPONGE_TCP_SENDER_HH
  3. #include "byte_stream.hh"
  4. #include "tcp_config.hh"
  5. #include "tcp_segment.hh"
  6. #include "wrapping_integers.hh"
  7. #include <functional>
  8. #include <queue>
  9. //! \brief The "sender" part of a TCP implementation.
  10. //! Accepts a ByteStream, divides it up into segments and sends the
  11. //! segments, keeps track of which segments are still in-flight,
  12. //! maintains the Retransmission Timer, and retransmits in-flight
  13. //! segments if the retransmission timer expires.
  14. class TCPSender {
  15. private:
  16. // 发送方随机选择的ISN
  17. //! our initial sequence number, the number for our SYN.
  18. WrappingInt32 _isn;
  19. // 把需要发送的报文段放到这个队列
  20. //! outbound queue of segments that the TCPSender wants sent
  21. std::queue<TCPSegment> _segments_out{};
  22. // 这个变量保存的是RTO的初始值
  23. //! retransmission timer for the connection
  24. unsigned int _initial_retransmission_timeout;
  25. // 还没有发送的数据
  26. //! outgoing stream of bytes that have not yet been sent
  27. ByteStream _stream;
  28. // 未发送数据的第一个字节的绝对序号,包括SYN和FIN
  29. //! the (absolute) sequence number for the next byte to be sent
  30. uint64_t _next_seqno{0};
  31. size_t _lately_ackno = 0; // 从接收方返回的最大的ackno
  32. size_t _window_size = 0; // 和_lately_ackno配套的window size
  33. size_t _bytes_in_flight = 0; // 是_segments_outstanding中的序号空间大小
  34. std::queue<TCPSegment> _segments_outstanding{}; // 已发送但未确认的报文段
  35. size_t _retransmission_timeout = 0; // RTO
  36. size_t _consecutive_retransmission = 0; // 连续重传次数
  37. size_t _timer_ms = 0; // 计时器启动后经过的毫秒数
  38. bool _timer_running = false; // 计时器是否启动
  39. void send_segment(TCPSegment &seg); // 发送已经填写了必要信息的报文段,这个方法内部只会再向TCPSegment中填入seqno
  40. public:
  41. //! Initialize a TCPSender
  42. // 构造函数的参数:容量、初始超时时间(RTO的初始值)、ISN
  43. TCPSender(const size_t capacity = TCPConfig::DEFAULT_CAPACITY,
  44. const uint16_t retx_timeout = TCPConfig::TIMEOUT_DFLT,
  45. const std::optional<WrappingInt32> fixed_isn = {});
  46. //! \name "Input" interface for the writer
  47. //!@{
  48. ByteStream &stream_in() { return _stream; }
  49. const ByteStream &stream_in() const { return _stream; }
  50. //!@}
  51. //! \name Methods that can cause the TCPSender to send a segment
  52. //!@{
  53. //! \brief A new acknowledgment was received
  54. void ack_received(const WrappingInt32 ackno, const uint16_t window_size);
  55. //! \brief Generate an empty-payload segment (useful for creating empty ACK segments)
  56. void send_empty_segment();
  57. //! \brief create and send segments to fill as much of the window as possible
  58. void fill_window();
  59. // 每隔几毫秒,您的TCPSender的tick方法将被调用,并带有一个参数,该参数指示自上次调用该方法以来已经过了几毫秒
  60. //! \brief Notifies the TCPSender of the passage of time
  61. void tick(const size_t ms_since_last_tick);
  62. //!@}
  63. //! \name Accessors
  64. //!@{
  65. // 已发送但未确认的字节数,SYN和FIN各占1字节
  66. //! \brief How many sequence numbers are occupied by segments sent but not yet acknowledged?
  67. //! \note count is in "sequence space," i.e. SYN and FIN each count for one byte
  68. //! (see TCPSegment::length_in_sequence_space())
  69. size_t bytes_in_flight() const;
  70. // 连续重传次数
  71. //! \brief Number of consecutive retransmissions that have occurred in a row
  72. unsigned int consecutive_retransmissions() const;
  73. //! \brief TCPSegments that the TCPSender has enqueued for transmission.
  74. //! \note These must be dequeued and sent by the TCPConnection,
  75. //! which will need to fill in the fields that are set by the TCPReceiver
  76. //! (ackno and window size) before sending.
  77. std::queue<TCPSegment> &segments_out() { return _segments_out; }
  78. //!@}
  79. //! \name What is the next sequence number? (used for testing)
  80. //!@{
  81. //! \brief absolute seqno for the next byte to be sent
  82. uint64_t next_seqno_absolute() const { return _next_seqno; }
  83. //! \brief relative seqno for the next byte to be sent
  84. WrappingInt32 next_seqno() const { return wrap(_next_seqno, _isn); }
  85. //!@}
  86. };
  87. #endif // SPONGE_LIBSPONGE_TCP_SENDER_HH

tcp_sender.cc

  1. #include "tcp_sender.hh"
  2. #include "tcp_config.hh"
  3. #include <random>
  4. #include <iostream>
  5. #include <cmath>
  6. // Dummy implementation of a TCP sender
  7. // For Lab 3, please replace with a real implementation that passes the
  8. // automated checks run by `make check_lab3`.
  9. template <typename... Targs>
  10. void DUMMY_CODE(Targs &&... /* unused */) {}
  11. using namespace std;
  12. //! \param[in] capacity the capacity of the outgoing byte stream
  13. //! \param[in] retx_timeout the initial amount of time to wait before retransmitting the oldest outstanding segment
  14. //! \param[in] fixed_isn the Initial Sequence Number to use, if set (otherwise uses a random ISN)
  15. TCPSender::TCPSender(const size_t capacity, const uint16_t retx_timeout, const std::optional<WrappingInt32> fixed_isn)
  16. : _isn(fixed_isn.value_or(WrappingInt32{random_device()()}))
  17. , _initial_retransmission_timeout{retx_timeout}
  18. , _stream(capacity)
  19. ,_lately_ackno(0), _window_size(1) // fixed
  20. ,_bytes_in_flight(0), _segments_outstanding()
  21. ,_retransmission_timeout{retx_timeout}, _consecutive_retransmission(0)
  22. ,_timer_ms(0), _timer_running(false) {}
  23. // _segments_outstanding中所有报文段的和,包括SYN和FIN
  24. uint64_t TCPSender::bytes_in_flight() const {
  25. return _bytes_in_flight;
  26. }
  27. void TCPSender::fill_window() {
  28. if (_next_seqno==0) { // 状态0 CLOSED
  29. // 发送不带数据的SYN报文
  30. TCPSegment seg;
  31. seg.header().syn=true;
  32. send_segment(seg);
  33. std::cout<<"~~~~~~~~~~~~~~~~~~~~~~~~ start with RTO: "<<_retransmission_timeout<<std::endl;
  34. } else if (_next_seqno>0&&bytes_in_flight()==_next_seqno) { // 状态1 SYN_SENT
  35. // 在未收到任何ACK之前,假设接收方的窗口大小为1,所以这里不能再发送任何数据
  36. } else if (!_stream.eof()&&bytes_in_flight()<_next_seqno) { // 状态2 SYN_ACKED
  37. /* 考察区间[_next_seqno, _lately_ackno+_window_size)
  38. 如果区间不存在,则不能发送新的数据
  39. 如果window_size==0,则可以发送1个字节的数据
  40. */
  41. size_t window_size=_window_size==0?1:_window_size; // fixed
  42. if (_lately_ackno+window_size>_next_seqno&&!_stream.buffer_empty()) { // fixed
  43. // 注意MAX_PAYLOAD_SIZE=1452字节 fixed
  44. // 有可以发送的数据
  45. size_t size_to_send = _lately_ackno+window_size-_next_seqno;
  46. while (size_to_send>0&&!_stream.buffer_empty()) {
  47. size_t size_to_read = std::min(size_to_send, TCPConfig::MAX_PAYLOAD_SIZE);
  48. // std::string data_to_send = _stream.read(_lately_ackno+window_size-_next_seqno);
  49. std::string data_to_send = _stream.read(size_to_read);
  50. size_to_send-=data_to_send.size();
  51. std::cout<<"****** size_to_read: "<<size_to_read<<" data_to_send.size(): "<<data_to_send.size();
  52. TCPSegment seg;
  53. seg.payload() = Buffer(std::move(data_to_send));
  54. // 如果stream读完了,且还有余下的空间放置FIN
  55. if (_stream.eof()&&size_to_send>0) {
  56. // 这样的话循环条件一定不成立,所以size_to_send也没必要再-1了
  57. seg.header().fin=true;
  58. }
  59. send_segment(seg);
  60. std::cout<<" _segments_out.size(): "<<_segments_out.size()<<std::endl;
  61. if (_segments_out.size()) {
  62. std::cout<<_segments_out.front().payload().str()<<std::endl;
  63. }
  64. }
  65. }
  66. } else if (_stream.eof()&&_next_seqno<_stream.bytes_written()+2) { // 状态3 SYN_ACKED and FIN not Sent
  67. // 这种状态下一定只剩下1个FIN没放进TCPSegment的队列
  68. // 接收窗口也可能是0吧?
  69. size_t window_size=_window_size==0?1:_window_size; // fixed
  70. if (_lately_ackno+window_size>_next_seqno) {
  71. TCPSegment seg;
  72. seg.header().fin=true;
  73. send_segment(seg);
  74. }
  75. } else if (_stream.eof()&&_next_seqno==_stream.bytes_written()+2&&bytes_in_flight()>0) { // 状态4 FIN_SENT
  76. // 这里不用发送数据
  77. } else if (_stream.eof()&&_next_seqno==_stream.bytes_written()+2&&bytes_in_flight()==0) { // 状态5 FIN_ACKED
  78. // 这里不用发送数据
  79. } else { // 应该不会到这个分支
  80. }
  81. }
  82. //! \param ackno The remote receiver's ackno (acknowledgment number)
  83. //! \param window_size The remote receiver's advertised window size
  84. void TCPSender::ack_received(const WrappingInt32 ackno, const uint16_t window_size) {
  85. size_t _ackno=unwrap(ackno, _isn, _lately_ackno);
  86. std::cout<<"***** _ackno: "<<_ackno<<" _lately_ackno: "<<_lately_ackno<<std::endl;
  87. if (_ackno>=_lately_ackno) {
  88. // if (_ackno>_lately_ackno) { // 这样的话,部分确认也会重置RTO和清零连续重传次数
  89. // _consecutive_retransmission=0;
  90. // _retransmission_timeout=_initial_retransmission_timeout;
  91. // }
  92. _lately_ackno=_ackno;
  93. _window_size=window_size; // 这里window_size可能是0
  94. }
  95. bool seg_acked=false;
  96. while (_segments_outstanding.size()) {
  97. TCPSegment seg = _segments_outstanding.front();
  98. size_t abs_seqno = unwrap(seg.header().seqno, _isn, _lately_ackno);
  99. if (abs_seqno+seg.length_in_sequence_space()<=_lately_ackno) {
  100. seg_acked=true;
  101. _segments_outstanding.pop();
  102. _bytes_in_flight-=seg.length_in_sequence_space(); // fixed
  103. } else {
  104. break;
  105. }
  106. }
  107. // 我认为这里_timer_running一定是true
  108. if (_segments_outstanding.empty()) {
  109. _timer_running=false; // 关闭定时器
  110. } else if (seg_acked) { // 重启定时器
  111. _timer_running=true;
  112. _timer_ms=0;
  113. }
  114. // 这样的话,部分确认不会重置RTO和清零连续重传次数,也不会调用fill_window()
  115. if (seg_acked) {
  116. _consecutive_retransmission=0;
  117. _retransmission_timeout=_initial_retransmission_timeout;
  118. fill_window(); // fixed
  119. }
  120. // 如果接收窗口有了新的空间,TCPSender应该再次填充该窗口。
  121. // todo
  122. }
  123. //! \param[in] ms_since_last_tick the number of milliseconds since the last call to this method
  124. void TCPSender::tick(const size_t ms_since_last_tick) {
  125. // 目前没有考虑窗口大小为0的情况
  126. if (_timer_running) {
  127. _timer_ms+=ms_since_last_tick;
  128. if (_timer_ms>=_retransmission_timeout) { // 定时器超时,需要重传最早的未被确认的报文段
  129. _timer_ms=0; // 重启定时器
  130. // if (_window_size>0||(_next_seqno>0&&bytes_in_flight()==_next_seqno)) { // fixed
  131. if (_window_size>0) { // fixed
  132. _consecutive_retransmission++; // 重传次数加一
  133. _retransmission_timeout<<=1; // RTO加倍
  134. }
  135. if (_segments_outstanding.size()) { // 这个判断应该一定是成立的吧?
  136. _segments_out.push(_segments_outstanding.front());
  137. }
  138. }
  139. }
  140. }
  141. unsigned int TCPSender::consecutive_retransmissions() const {
  142. return _consecutive_retransmission;
  143. }
  144. // 不带SYN、FIN,且不包含数据,只需要设置正确的seqno
  145. void TCPSender::send_empty_segment() {
  146. TCPSegment seg;
  147. seg.header().seqno=wrap(_next_seqno, _isn);
  148. _segments_out.push(seg);
  149. // 不用加入_segments_outstanding,也不用启动定时器
  150. }
  151. void TCPSender::send_segment(TCPSegment &seg) {
  152. seg.header().seqno=wrap(_next_seqno, _isn);
  153. _next_seqno+=seg.length_in_sequence_space();
  154. _bytes_in_flight+=seg.length_in_sequence_space();
  155. _segments_out.push(seg);
  156. _segments_outstanding.push(seg);
  157. // 每次发送报文段都要检查定时器,如果未启动则启动
  158. if (!_timer_running) {
  159. _timer_ms=0;
  160. _timer_running=true;
  161. }
  162. }

测试结果

image.png

第三阶段

说明

  • 引入Lab4的代码后,新增了测试,Lab3又有样例没过,经过DEBUG,发现是少考虑一种特殊情况
    • Impossible ackno (beyond next seqno) is ignored
    • 只需要在ack_received方法前加入相关判断即可

image.png

代码

  1. #ifndef SPONGE_LIBSPONGE_TCP_SENDER_HH
  2. #define SPONGE_LIBSPONGE_TCP_SENDER_HH
  3. #include "byte_stream.hh"
  4. #include "tcp_config.hh"
  5. #include "tcp_segment.hh"
  6. #include "wrapping_integers.hh"
  7. #include <functional>
  8. #include <queue>
  9. //! \brief The "sender" part of a TCP implementation.
  10. //! Accepts a ByteStream, divides it up into segments and sends the
  11. //! segments, keeps track of which segments are still in-flight,
  12. //! maintains the Retransmission Timer, and retransmits in-flight
  13. //! segments if the retransmission timer expires.
  14. class TCPSender {
  15. private:
  16. // 发送方随机选择的ISN
  17. //! our initial sequence number, the number for our SYN.
  18. WrappingInt32 _isn;
  19. // 把需要发送的报文段放到这个队列
  20. //! outbound queue of segments that the TCPSender wants sent
  21. std::queue<TCPSegment> _segments_out{};
  22. // 这个变量保存的是RTO的初始值
  23. //! retransmission timer for the connection
  24. unsigned int _initial_retransmission_timeout;
  25. // 还没有发送的数据
  26. //! outgoing stream of bytes that have not yet been sent
  27. ByteStream _stream;
  28. // 未发送数据的第一个字节的绝对序号,包括SYN和FIN
  29. //! the (absolute) sequence number for the next byte to be sent
  30. uint64_t _next_seqno{0};
  31. size_t _lately_ackno = 0; // 从接收方返回的最大的ackno
  32. size_t _window_size = 0; // 和_lately_ackno配套的window size
  33. size_t _bytes_in_flight = 0; // 是_segments_outstanding中的序号空间大小
  34. std::queue<TCPSegment> _segments_outstanding{}; // 已发送但未确认的报文段
  35. size_t _retransmission_timeout = 0; // RTO
  36. size_t _consecutive_retransmission = 0; // 连续重传次数
  37. size_t _timer_ms = 0; // 计时器启动后经过的毫秒数
  38. bool _timer_running = false; // 计时器是否启动
  39. void send_segment(TCPSegment &seg); // 发送已经填写了必要信息的报文段,这个方法内部只会再向TCPSegment中填入seqno
  40. public:
  41. //! Initialize a TCPSender
  42. // 构造函数的参数:容量、初始超时时间(RTO的初始值)、ISN
  43. TCPSender(const size_t capacity = TCPConfig::DEFAULT_CAPACITY,
  44. const uint16_t retx_timeout = TCPConfig::TIMEOUT_DFLT,
  45. const std::optional<WrappingInt32> fixed_isn = {});
  46. //! \name "Input" interface for the writer
  47. //!@{
  48. ByteStream &stream_in() { return _stream; }
  49. const ByteStream &stream_in() const { return _stream; }
  50. //!@}
  51. //! \name Methods that can cause the TCPSender to send a segment
  52. //!@{
  53. //! \brief A new acknowledgment was received
  54. void ack_received(const WrappingInt32 ackno, const uint16_t window_size);
  55. //! \brief Generate an empty-payload segment (useful for creating empty ACK segments)
  56. void send_empty_segment();
  57. //! \brief create and send segments to fill as much of the window as possible
  58. void fill_window();
  59. // 每隔几毫秒,您的TCPSender的tick方法将被调用,并带有一个参数,该参数指示自上次调用该方法以来已经过了几毫秒
  60. //! \brief Notifies the TCPSender of the passage of time
  61. void tick(const size_t ms_since_last_tick);
  62. //!@}
  63. //! \name Accessors
  64. //!@{
  65. // 已发送但未确认的字节数,SYN和FIN各占1字节
  66. //! \brief How many sequence numbers are occupied by segments sent but not yet acknowledged?
  67. //! \note count is in "sequence space," i.e. SYN and FIN each count for one byte
  68. //! (see TCPSegment::length_in_sequence_space())
  69. size_t bytes_in_flight() const;
  70. // 连续重传次数
  71. //! \brief Number of consecutive retransmissions that have occurred in a row
  72. unsigned int consecutive_retransmissions() const;
  73. //! \brief TCPSegments that the TCPSender has enqueued for transmission.
  74. //! \note These must be dequeued and sent by the TCPConnection,
  75. //! which will need to fill in the fields that are set by the TCPReceiver
  76. //! (ackno and window size) before sending.
  77. std::queue<TCPSegment> &segments_out() { return _segments_out; }
  78. //!@}
  79. //! \name What is the next sequence number? (used for testing)
  80. //!@{
  81. //! \brief absolute seqno for the next byte to be sent
  82. uint64_t next_seqno_absolute() const { return _next_seqno; }
  83. //! \brief relative seqno for the next byte to be sent
  84. WrappingInt32 next_seqno() const { return wrap(_next_seqno, _isn); }
  85. //!@}
  86. };
  87. #endif // SPONGE_LIBSPONGE_TCP_SENDER_HH
  88. #include "tcp_sender.hh"
  89. #include "tcp_config.hh"
  90. #include <random>
  91. #include <iostream>
  92. #include <cmath>
  93. // Dummy implementation of a TCP sender
  94. // For Lab 3, please replace with a real implementation that passes the
  95. // automated checks run by `make check_lab3`.
  96. template <typename... Targs>
  97. void DUMMY_CODE(Targs &&... /* unused */) {}
  98. using namespace std;
  99. //! \param[in] capacity the capacity of the outgoing byte stream
  100. //! \param[in] retx_timeout the initial amount of time to wait before retransmitting the oldest outstanding segment
  101. //! \param[in] fixed_isn the Initial Sequence Number to use, if set (otherwise uses a random ISN)
  102. TCPSender::TCPSender(const size_t capacity, const uint16_t retx_timeout, const std::optional<WrappingInt32> fixed_isn)
  103. : _isn(fixed_isn.value_or(WrappingInt32{random_device()()}))
  104. , _initial_retransmission_timeout{retx_timeout}
  105. , _stream(capacity)
  106. ,_lately_ackno(0), _window_size(1) // fixed
  107. ,_bytes_in_flight(0), _segments_outstanding()
  108. ,_retransmission_timeout{retx_timeout}, _consecutive_retransmission(0)
  109. ,_timer_ms(0), _timer_running(false) {}
  110. // _segments_outstanding中所有报文段的和,包括SYN和FIN
  111. uint64_t TCPSender::bytes_in_flight() const {
  112. return _bytes_in_flight;
  113. }
  114. void TCPSender::fill_window() {
  115. if (_next_seqno==0) { // 状态0 CLOSED
  116. // 发送不带数据的SYN报文
  117. TCPSegment seg;
  118. seg.header().syn=true;
  119. send_segment(seg);
  120. std::cout<<"~~~~~~~~~~~~~~~~~~~~~~~~ start with RTO: "<<_retransmission_timeout<<std::endl;
  121. } else if (_next_seqno>0&&bytes_in_flight()==_next_seqno) { // 状态1 SYN_SENT
  122. // 在未收到任何ACK之前,假设接收方的窗口大小为1,所以这里不能再发送任何数据
  123. } else if (!_stream.eof()&&bytes_in_flight()<_next_seqno) { // 状态2 SYN_ACKED
  124. /* 考察区间[_next_seqno, _lately_ackno+_window_size)
  125. 如果区间不存在,则不能发送新的数据
  126. 如果window_size==0,则可以发送1个字节的数据
  127. */
  128. size_t window_size=_window_size==0?1:_window_size; // fixed
  129. if (_lately_ackno+window_size>_next_seqno&&!_stream.buffer_empty()) { // fixed
  130. // 注意MAX_PAYLOAD_SIZE=1452字节 fixed
  131. // 有可以发送的数据
  132. size_t size_to_send = _lately_ackno+window_size-_next_seqno;
  133. while (size_to_send>0&&!_stream.buffer_empty()) {
  134. size_t size_to_read = std::min(size_to_send, TCPConfig::MAX_PAYLOAD_SIZE);
  135. // std::string data_to_send = _stream.read(_lately_ackno+window_size-_next_seqno);
  136. std::string data_to_send = _stream.read(size_to_read);
  137. size_to_send-=data_to_send.size();
  138. std::cout<<"****** size_to_read: "<<size_to_read<<" data_to_send.size(): "<<data_to_send.size();
  139. TCPSegment seg;
  140. seg.payload() = Buffer(std::move(data_to_send));
  141. // 如果stream读完了,且还有余下的空间放置FIN
  142. if (_stream.eof()&&size_to_send>0) {
  143. // 这样的话循环条件一定不成立,所以size_to_send也没必要再-1了
  144. seg.header().fin=true;
  145. }
  146. send_segment(seg);
  147. std::cout<<" _segments_out.size(): "<<_segments_out.size()<<std::endl;
  148. if (_segments_out.size()) {
  149. std::cout<<_segments_out.front().payload().str()<<std::endl;
  150. }
  151. }
  152. }
  153. } else if (_stream.eof()&&_next_seqno<_stream.bytes_written()+2) { // 状态3 SYN_ACKED and FIN not Sent
  154. // 这种状态下一定只剩下1个FIN没放进TCPSegment的队列
  155. // 接收窗口也可能是0吧?
  156. size_t window_size=_window_size==0?1:_window_size; // fixed
  157. if (_lately_ackno+window_size>_next_seqno) {
  158. TCPSegment seg;
  159. seg.header().fin=true;
  160. send_segment(seg);
  161. }
  162. } else if (_stream.eof()&&_next_seqno==_stream.bytes_written()+2&&bytes_in_flight()>0) { // 状态4 FIN_SENT
  163. // 这里不用发送数据
  164. } else if (_stream.eof()&&_next_seqno==_stream.bytes_written()+2&&bytes_in_flight()==0) { // 状态5 FIN_ACKED
  165. // 这里不用发送数据
  166. } else { // 应该不会到这个分支
  167. }
  168. }
  169. //! \param ackno The remote receiver's ackno (acknowledgment number)
  170. //! \param window_size The remote receiver's advertised window size
  171. void TCPSender::ack_received(const WrappingInt32 ackno, const uint16_t window_size) {
  172. size_t _ackno=unwrap(ackno, _isn, _lately_ackno);
  173. // Impossible ackno (beyond next seqno) is ignored
  174. if (_ackno>_next_seqno) {
  175. return;
  176. }
  177. std::cout<<"***** _ackno: "<<_ackno<<" _lately_ackno: "<<_lately_ackno<<std::endl;
  178. if (_ackno>=_lately_ackno) {
  179. // if (_ackno>_lately_ackno) { // 这样的话,部分确认也会重置RTO和清零连续重传次数
  180. // _consecutive_retransmission=0;
  181. // _retransmission_timeout=_initial_retransmission_timeout;
  182. // }
  183. _lately_ackno=_ackno;
  184. _window_size=window_size; // 这里window_size可能是0
  185. }
  186. bool seg_acked=false;
  187. while (_segments_outstanding.size()) {
  188. TCPSegment seg = _segments_outstanding.front();
  189. size_t abs_seqno = unwrap(seg.header().seqno, _isn, _lately_ackno);
  190. if (abs_seqno+seg.length_in_sequence_space()<=_lately_ackno) {
  191. seg_acked=true;
  192. _segments_outstanding.pop();
  193. _bytes_in_flight-=seg.length_in_sequence_space(); // fixed
  194. } else {
  195. break;
  196. }
  197. }
  198. // 我认为这里_timer_running一定是true
  199. if (_segments_outstanding.empty()) {
  200. _timer_running=false; // 关闭定时器
  201. } else if (seg_acked) { // 重启定时器
  202. _timer_running=true;
  203. _timer_ms=0;
  204. }
  205. // 这样的话,部分确认不会重置RTO和清零连续重传次数,也不会调用fill_window()
  206. if (seg_acked) {
  207. _consecutive_retransmission=0;
  208. _retransmission_timeout=_initial_retransmission_timeout;
  209. fill_window(); // fixed
  210. }
  211. // 如果接收窗口有了新的空间,TCPSender应该再次填充该窗口。
  212. // todo
  213. }
  214. //! \param[in] ms_since_last_tick the number of milliseconds since the last call to this method
  215. void TCPSender::tick(const size_t ms_since_last_tick) {
  216. // 目前没有考虑窗口大小为0的情况
  217. if (_timer_running) {
  218. _timer_ms+=ms_since_last_tick;
  219. if (_timer_ms>=_retransmission_timeout) { // 定时器超时,需要重传最早的未被确认的报文段
  220. _timer_ms=0; // 重启定时器
  221. // if (_window_size>0||(_next_seqno>0&&bytes_in_flight()==_next_seqno)) { // fixed
  222. if (_window_size>0) { // fixed
  223. _consecutive_retransmission++; // 重传次数加一
  224. _retransmission_timeout<<=1; // RTO加倍
  225. }
  226. if (_segments_outstanding.size()) { // 这个判断应该一定是成立的吧?
  227. _segments_out.push(_segments_outstanding.front());
  228. }
  229. }
  230. }
  231. }
  232. unsigned int TCPSender::consecutive_retransmissions() const {
  233. return _consecutive_retransmission;
  234. }
  235. // 不带SYN、FIN,且不包含数据,只需要设置正确的seqno
  236. void TCPSender::send_empty_segment() {
  237. TCPSegment seg;
  238. seg.header().seqno=wrap(_next_seqno, _isn);
  239. _segments_out.push(seg);
  240. // 不用加入_segments_outstanding,也不用启动定时器
  241. }
  242. void TCPSender::send_segment(TCPSegment &seg) {
  243. seg.header().seqno=wrap(_next_seqno, _isn);
  244. _next_seqno+=seg.length_in_sequence_space();
  245. _bytes_in_flight+=seg.length_in_sequence_space();
  246. _segments_out.push(seg);
  247. _segments_outstanding.push(seg);
  248. // 每次发送报文段都要检查定时器,如果未启动则启动
  249. if (!_timer_running) {
  250. _timer_ms=0;
  251. _timer_running=true;
  252. }
  253. }