前言
- merge lab3的初始代码,出现如下变化

通读文档后一些疑问
- 谁来调用fill_window()?
- ack_received()需要调用fill_window()吗?
- ack_received会更新接收方的接收窗口
- 放入_segments_out的TCPSegment的ackno怎么填?
- send_empty_segment()的TCPSegment的ackno怎么填?
- 我觉得都不需要填,由TCPConnection来填,只需要填入seqno
- 接收窗口为0的逻辑在哪里处理?怎么处理?
定时器相关疑问
- 定时器第一次启动是什么时候?
- 每次发送序号空间不为0的段时,就会检查定时器是否启动,如果没有启动就启动
- 所以第一次启动,是发送SYN报文段后
- 定时器什么时候会被启动/重新启动
- 每次发送序号空间不为0的段时,就会检查定时器是否启动,如果没有启动就启动
- ack_received()被调用后,接收到1个之前从来没有出现过的更大的ackno,且_segments_out未被清空,则启动或重启
- 我觉得这里应该一定有定时器是开着的,因为_segments_out未被清空,所以一定是重启
- 注意RTO可能被更新(变回初始值)
- 定时器超时后,要再次启动
- 注意RTO可能被更新
- 定时器怎么确认启动时,当前时刻是多少?定时器什么时候超时?定时器怎么确认超时?
定义一个成员变量,表示毫秒数,每次tick()被调用,更新这个毫秒数是否可以认为tick()被调用的足够频繁,以至于定时器认为此时的毫秒数变量指示的就是真实的毫秒数?tick()被调用,在tick()方法中发现当前的毫秒数已经超过了定时器设置的超时毫秒数,则定时器超时- 定义一个成员变量_timer_ms
- 当定时器被启动或重启时,_timer_ms置为0
- 每次tick被调用
- _timer_ms+=ms_since_last_tick
- 若_timer_ms>=RTO,则定时器超时
定时器什么时候会被关闭?
- ack_received()被调用后,_segments_out被清空时,如果当前定时器开着,就关闭
连续重传次数什么时候+1?连续重传次数什么时候清零?TCPSender除了对连续重传次数+1和清零,还需要做别的事情吗?
- 如果tick()被调用,发现定时器超时,则会重传最早未被确认的报文段,此时连续重传次数+1(文档里说if window size is nonzero???)
- ack_received()被调用,接收到1个之前从来没有出现过的更大的ackno,则连续重传次数清零
- RTO什么时候加倍?RTO什么时候恢复初始值?
- 如果tick()被调用,发现定时器超时,则会重传最早未被确认的报文段,此时RTO加倍(文档里说if window size is nonzero???)
- ack_received()被调用,接收到1个之前从来没有出现过的更大的ackno,则RTO变回初始值
需要额外设置哪些成员变量?
- _initial_retransmission_timeout存储的是RTO的初始值,我们还需要存储RTO,_retransmission_timeout
- 最新的ackno和window_size
- 注意_segments_out表示的是所有已经封装好,马上要被发送的报文段队列,我们还需要一个额外的数据结构,来保存那些已发送但未被确认的报文段,前者是后者的子集
- 用std::queue
来保存
- 用std::queue
- 我们经常需要调用bytes_in_flight()来计算那些已发送但未被确认的字节数,由于std::queue不能很方便的遍历,即使要遍历的话,也很浪费时间
- 所以直接设置一个变量_bytes_in_flight,来动态的记录这个队列中的TCP报文段的字节数之和,以提高时间效率
- 连续重传次数,_consecutive_retransmission
- 定时器相关
- 计时器启动后已经经历的毫秒数,_timer_ms
- 定时器是否启动,_timer_running
第一阶段
分析
- 大部分的疑问和思考见前言
- 本阶段的代码中,完全没有考虑接收方窗口大小为0的情况
代码
tcp_sender.hh ```cppifndef SPONGE_LIBSPONGE_TCP_SENDER_HH
define SPONGE_LIBSPONGE_TCP_SENDER_HH
include “byte_stream.hh”
include “tcp_config.hh”
include “tcp_segment.hh”
include “wrapping_integers.hh”
include
include
//! \brief The “sender” part of a TCP implementation.
//! Accepts a ByteStream, divides it up into segments and sends the //! segments, keeps track of which segments are still in-flight, //! maintains the Retransmission Timer, and retransmits in-flight //! segments if the retransmission timer expires. class TCPSender { private: // 发送方随机选择的ISN //! our initial sequence number, the number for our SYN. WrappingInt32 _isn;
// 把需要发送的报文段放到这个队列//! outbound queue of segments that the TCPSender wants sentstd::queue<TCPSegment> _segments_out{};// 这个变量保存的是RTO的初始值//! retransmission timer for the connectionunsigned int _initial_retransmission_timeout;// 还没有发送的数据//! outgoing stream of bytes that have not yet been sentByteStream _stream;// 未发送数据的第一个字节的绝对序号,包括SYN和FIN//! the (absolute) sequence number for the next byte to be sentuint64_t _next_seqno{0};size_t _lately_ackno = 0; // 从接收方返回的最大的acknosize_t _window_size = 0; // 和_lately_ackno配套的window sizesize_t _bytes_in_flight = 0; // 是_segments_outstanding中的序号空间大小std::queue<TCPSegment> _segments_outstanding{}; // 已发送但未确认的报文段size_t _retransmission_timeout = 0; // RTOsize_t _consecutive_retransmission = 0; // 连续重传次数size_t _timer_ms = 0; // 计时器启动后经过的毫秒数bool _timer_running = false; // 计时器是否启动void send_segment(TCPSegment &seg); // 发送已经填写了必要信息的报文段,这个方法内部只会再向TCPSegment中填入seqno
public:
//! Initialize a TCPSender
// 构造函数的参数:容量、初始超时时间(RTO的初始值)、ISN
TCPSender(const size_t capacity = TCPConfig::DEFAULT_CAPACITY,
const uint16_t retx_timeout = TCPConfig::TIMEOUT_DFLT,
const std::optional
//! \name "Input" interface for the writer//!@{ByteStream &stream_in() { return _stream; }const ByteStream &stream_in() const { return _stream; }//!@}//! \name Methods that can cause the TCPSender to send a segment//!@{//! \brief A new acknowledgment was receivedvoid ack_received(const WrappingInt32 ackno, const uint16_t window_size);//! \brief Generate an empty-payload segment (useful for creating empty ACK segments)void send_empty_segment();//! \brief create and send segments to fill as much of the window as possiblevoid fill_window();// 每隔几毫秒,您的TCPSender的tick方法将被调用,并带有一个参数,该参数指示自上次调用该方法以来已经过了几毫秒//! \brief Notifies the TCPSender of the passage of timevoid tick(const size_t ms_since_last_tick);//!@}//! \name Accessors//!@{// 已发送但未确认的字节数,SYN和FIN各占1字节//! \brief How many sequence numbers are occupied by segments sent but not yet acknowledged?//! \note count is in "sequence space," i.e. SYN and FIN each count for one byte//! (see TCPSegment::length_in_sequence_space())size_t bytes_in_flight() const;// 连续重传次数//! \brief Number of consecutive retransmissions that have occurred in a rowunsigned int consecutive_retransmissions() const;//! \brief TCPSegments that the TCPSender has enqueued for transmission.//! \note These must be dequeued and sent by the TCPConnection,//! which will need to fill in the fields that are set by the TCPReceiver//! (ackno and window size) before sending.std::queue<TCPSegment> &segments_out() { return _segments_out; }//!@}//! \name What is the next sequence number? (used for testing)//!@{//! \brief absolute seqno for the next byte to be sentuint64_t next_seqno_absolute() const { return _next_seqno; }//! \brief relative seqno for the next byte to be sentWrappingInt32 next_seqno() const { return wrap(_next_seqno, _isn); }//!@}
};
endif // SPONGE_LIBSPONGE_TCP_SENDER_HH
tcp_sender.cc```cpp#include "tcp_sender.hh"#include "tcp_config.hh"#include <random>// Dummy implementation of a TCP sender// For Lab 3, please replace with a real implementation that passes the// automated checks run by `make check_lab3`.template <typename... Targs>void DUMMY_CODE(Targs &&... /* unused */) {}using namespace std;//! \param[in] capacity the capacity of the outgoing byte stream//! \param[in] retx_timeout the initial amount of time to wait before retransmitting the oldest outstanding segment//! \param[in] fixed_isn the Initial Sequence Number to use, if set (otherwise uses a random ISN)TCPSender::TCPSender(const size_t capacity, const uint16_t retx_timeout, const std::optional<WrappingInt32> fixed_isn): _isn(fixed_isn.value_or(WrappingInt32{random_device()()})), _initial_retransmission_timeout{retx_timeout}, _stream(capacity),_lately_ackno(0), _window_size(0),_bytes_in_flight(0), _segments_outstanding(),_retransmission_timeout{retx_timeout}, _consecutive_retransmission(0),_timer_ms(0), _timer_running(false) {}// _segments_outstanding中所有报文段的和,包括SYN和FINuint64_t TCPSender::bytes_in_flight() const {return _bytes_in_flight;}void TCPSender::fill_window() {if (_next_seqno==0) { // 状态0 CLOSED// 发送不带数据的SYN报文TCPSegment seg;seg.header().syn=true;send_segment(seg);} else if (_next_seqno>0&&bytes_in_flight()==_next_seqno) { // 状态1 SYN_SENT// 在未收到任何ACK之前,假设接收方的窗口大小为1,所以这里不能再发送任何数据} else if (!_stream.eof()&&bytes_in_flight()<_next_seqno) { // 状态2 SYN_ACKED/* 考察区间[_next_seqno, _lately_ackno+_window_size)如果区间不存在,则不能发送新的数据如果window_size==0,则可以发送1个字节的数据*/if (_lately_ackno+_window_size>_next_seqno) {// 有可以发送的数据std::string data_to_send = _stream.read(_lately_ackno+_window_size-_next_seqno);TCPSegment seg;seg.payload() = Buffer(std::move(data_to_send));// 如果stream读完了,且还有余下的空间放置FINif (_stream.eof()&&data_to_send.size()<_lately_ackno+_window_size-_next_seqno) {seg.header().fin=true;}send_segment(seg);}} else if (_stream.eof()&&_next_seqno<_stream.bytes_written()+2) { // 状态3 SYN_ACKED and FIN not Sent// 这种状态下一定只剩下1个FIN没放进TCPSegment的队列// 接收窗口也可能是0吧?if (_lately_ackno+_window_size>_next_seqno) {TCPSegment seg;seg.header().fin=true;send_segment(seg);}} else if (_stream.eof()&&_next_seqno==_stream.bytes_written()+2&&bytes_in_flight()>0) { // 状态4 FIN_SENT// 这里不用发送数据} else if (_stream.eof()&&_next_seqno==_stream.bytes_written()+2&&bytes_in_flight()==0) { // 状态5 FIN_ACKED// 这里不用发送数据} else { // 应该不会到这个分支}}//! \param ackno The remote receiver's ackno (acknowledgment number)//! \param window_size The remote receiver's advertised window sizevoid TCPSender::ack_received(const WrappingInt32 ackno, const uint16_t window_size) {size_t _ackno=unwrap(ackno, _isn, _lately_ackno);if (_ackno>=_lately_ackno) {// if (_ackno>_lately_ackno) { // 这样的话,部分确认也会重置RTO和清零连续重传次数// _consecutive_retransmission=0;// _retransmission_timeout=_initial_retransmission_timeout;// }_lately_ackno=_ackno;_window_size=window_size;}bool seg_acked=false;while (_segments_outstanding.size()) {TCPSegment seg = _segments_outstanding.front();size_t abs_seqno = unwrap(seg.header().seqno, _isn, _lately_ackno);if (abs_seqno+seg.length_in_sequence_space()<=_lately_ackno) {seg_acked=true;_segments_outstanding.pop();} else {break;}}// 我认为这里_timer_running一定是trueif (_segments_outstanding.empty()) {_timer_running=false; // 关闭定时器} else if (seg_acked) { // 重启定时器_timer_running=true;_timer_ms=0;}// 这样的话,部分确认不会重置RTO和清零连续重传次数吗?if (seg_acked) {_consecutive_retransmission=0;_retransmission_timeout=_initial_retransmission_timeout;}// 如果接收窗口有了新的空间,TCPSender应该再次填充该窗口。// todo}//! \param[in] ms_since_last_tick the number of milliseconds since the last call to this methodvoid TCPSender::tick(const size_t ms_since_last_tick) {// 目前没有考虑窗口大小为0的情况if (_timer_running) {_timer_ms+=ms_since_last_tick;if (_timer_ms>=_retransmission_timeout) { // 定时器超时,需要重传最早的未被确认的报文段_timer_ms=0; // 重启定时器_consecutive_retransmission++; // 重传次数加一_retransmission_timeout<<=1; // RTO加倍if (_segments_outstanding.size()) { // 这个判断应该一定是成立的吧?_segments_out.push(_segments_outstanding.front());}}}}unsigned int TCPSender::consecutive_retransmissions() const {return _consecutive_retransmission;}// 不带SYN、FIN,且不包含数据,只需要设置正确的seqnovoid TCPSender::send_empty_segment() {TCPSegment seg;seg.header().seqno=wrap(_next_seqno, _isn);_segments_out.push(seg);// 不用加入_segments_outstanding,也不用启动定时器}void TCPSender::send_segment(TCPSegment &seg) {seg.header().seqno=wrap(_next_seqno, _isn);_next_seqno+=seg.length_in_sequence_space();_bytes_in_flight+=seg.length_in_sequence_space();_segments_out.push(seg);_segments_outstanding.push(seg);// 每次发送报文段都要检查定时器,如果未启动则启动if (!_timer_running) {_timer_ms=0;_timer_running=true;}}
测试结果

When filling window, treat a ‘0’ window size as equal to ‘1’ but don’t back off RTO
Retx SYN twice at the right times, then ack
DEBUG
- 问题一:在收到新的ack后,从_segments_outstanding中删除已经被完全确认的报文段,此时_bytes_in_flight这个变量应该更新,我忘记了
- 问题二:接收窗口很大,且待发送的数据很多,这种情况下,调用fill_window()一次,就会产生多个TCPSegment,我实现fill_window每次只会产生1个TCPSegment
- 问题三:在收到新的ack后,需要手动调用fill_window(),上面的代码中没有加
- 目前,我把这个调用和RTO的初始化、连续重传次数的清零放在了一起
- 只有在至少完全确认一个报文段后,才会调用fill_window(),如果只是部分确认,就不会调用fill_window()
- 其它可能的写法
- 无论是否收到新的ack,在ack_received中一定会调用fill_window()
- 收到新的ack,部分确认了最早已发送但未确认的报文段,那么就调用fill_window()
- 问题四:接收窗口为0的情况未处理
- 我一开始接受窗口的初始化值为0(在接收任何接收方返回的报文段之前,应假设为1),现在修改为了1
- 在收到新的ack,更新了_window_size=0后
- 超时后不再增加重传次数,也不会RTO加倍
- 在fill_window()方法中,如果读到_window_size=0,应将其当做1,所以设置了一个叫window_size的局部变量
第二阶段
代码
在修改了第一阶段的问题后,代码如下
tcp_sender.hh(无改动)
#ifndef SPONGE_LIBSPONGE_TCP_SENDER_HH#define SPONGE_LIBSPONGE_TCP_SENDER_HH#include "byte_stream.hh"#include "tcp_config.hh"#include "tcp_segment.hh"#include "wrapping_integers.hh"#include <functional>#include <queue>//! \brief The "sender" part of a TCP implementation.//! Accepts a ByteStream, divides it up into segments and sends the//! segments, keeps track of which segments are still in-flight,//! maintains the Retransmission Timer, and retransmits in-flight//! segments if the retransmission timer expires.class TCPSender {private:// 发送方随机选择的ISN//! our initial sequence number, the number for our SYN.WrappingInt32 _isn;// 把需要发送的报文段放到这个队列//! outbound queue of segments that the TCPSender wants sentstd::queue<TCPSegment> _segments_out{};// 这个变量保存的是RTO的初始值//! retransmission timer for the connectionunsigned int _initial_retransmission_timeout;// 还没有发送的数据//! outgoing stream of bytes that have not yet been sentByteStream _stream;// 未发送数据的第一个字节的绝对序号,包括SYN和FIN//! the (absolute) sequence number for the next byte to be sentuint64_t _next_seqno{0};size_t _lately_ackno = 0; // 从接收方返回的最大的acknosize_t _window_size = 0; // 和_lately_ackno配套的window sizesize_t _bytes_in_flight = 0; // 是_segments_outstanding中的序号空间大小std::queue<TCPSegment> _segments_outstanding{}; // 已发送但未确认的报文段size_t _retransmission_timeout = 0; // RTOsize_t _consecutive_retransmission = 0; // 连续重传次数size_t _timer_ms = 0; // 计时器启动后经过的毫秒数bool _timer_running = false; // 计时器是否启动void send_segment(TCPSegment &seg); // 发送已经填写了必要信息的报文段,这个方法内部只会再向TCPSegment中填入seqnopublic://! Initialize a TCPSender// 构造函数的参数:容量、初始超时时间(RTO的初始值)、ISNTCPSender(const size_t capacity = TCPConfig::DEFAULT_CAPACITY,const uint16_t retx_timeout = TCPConfig::TIMEOUT_DFLT,const std::optional<WrappingInt32> fixed_isn = {});//! \name "Input" interface for the writer//!@{ByteStream &stream_in() { return _stream; }const ByteStream &stream_in() const { return _stream; }//!@}//! \name Methods that can cause the TCPSender to send a segment//!@{//! \brief A new acknowledgment was receivedvoid ack_received(const WrappingInt32 ackno, const uint16_t window_size);//! \brief Generate an empty-payload segment (useful for creating empty ACK segments)void send_empty_segment();//! \brief create and send segments to fill as much of the window as possiblevoid fill_window();// 每隔几毫秒,您的TCPSender的tick方法将被调用,并带有一个参数,该参数指示自上次调用该方法以来已经过了几毫秒//! \brief Notifies the TCPSender of the passage of timevoid tick(const size_t ms_since_last_tick);//!@}//! \name Accessors//!@{// 已发送但未确认的字节数,SYN和FIN各占1字节//! \brief How many sequence numbers are occupied by segments sent but not yet acknowledged?//! \note count is in "sequence space," i.e. SYN and FIN each count for one byte//! (see TCPSegment::length_in_sequence_space())size_t bytes_in_flight() const;// 连续重传次数//! \brief Number of consecutive retransmissions that have occurred in a rowunsigned int consecutive_retransmissions() const;//! \brief TCPSegments that the TCPSender has enqueued for transmission.//! \note These must be dequeued and sent by the TCPConnection,//! which will need to fill in the fields that are set by the TCPReceiver//! (ackno and window size) before sending.std::queue<TCPSegment> &segments_out() { return _segments_out; }//!@}//! \name What is the next sequence number? (used for testing)//!@{//! \brief absolute seqno for the next byte to be sentuint64_t next_seqno_absolute() const { return _next_seqno; }//! \brief relative seqno for the next byte to be sentWrappingInt32 next_seqno() const { return wrap(_next_seqno, _isn); }//!@}};#endif // SPONGE_LIBSPONGE_TCP_SENDER_HH
tcp_sender.cc
#include "tcp_sender.hh"#include "tcp_config.hh"#include <random>#include <iostream>#include <cmath>// Dummy implementation of a TCP sender// For Lab 3, please replace with a real implementation that passes the// automated checks run by `make check_lab3`.template <typename... Targs>void DUMMY_CODE(Targs &&... /* unused */) {}using namespace std;//! \param[in] capacity the capacity of the outgoing byte stream//! \param[in] retx_timeout the initial amount of time to wait before retransmitting the oldest outstanding segment//! \param[in] fixed_isn the Initial Sequence Number to use, if set (otherwise uses a random ISN)TCPSender::TCPSender(const size_t capacity, const uint16_t retx_timeout, const std::optional<WrappingInt32> fixed_isn): _isn(fixed_isn.value_or(WrappingInt32{random_device()()})), _initial_retransmission_timeout{retx_timeout}, _stream(capacity),_lately_ackno(0), _window_size(1) // fixed,_bytes_in_flight(0), _segments_outstanding(),_retransmission_timeout{retx_timeout}, _consecutive_retransmission(0),_timer_ms(0), _timer_running(false) {}// _segments_outstanding中所有报文段的和,包括SYN和FINuint64_t TCPSender::bytes_in_flight() const {return _bytes_in_flight;}void TCPSender::fill_window() {if (_next_seqno==0) { // 状态0 CLOSED// 发送不带数据的SYN报文TCPSegment seg;seg.header().syn=true;send_segment(seg);std::cout<<"~~~~~~~~~~~~~~~~~~~~~~~~ start with RTO: "<<_retransmission_timeout<<std::endl;} else if (_next_seqno>0&&bytes_in_flight()==_next_seqno) { // 状态1 SYN_SENT// 在未收到任何ACK之前,假设接收方的窗口大小为1,所以这里不能再发送任何数据} else if (!_stream.eof()&&bytes_in_flight()<_next_seqno) { // 状态2 SYN_ACKED/* 考察区间[_next_seqno, _lately_ackno+_window_size)如果区间不存在,则不能发送新的数据如果window_size==0,则可以发送1个字节的数据*/size_t window_size=_window_size==0?1:_window_size; // fixedif (_lately_ackno+window_size>_next_seqno&&!_stream.buffer_empty()) { // fixed// 注意MAX_PAYLOAD_SIZE=1452字节 fixed// 有可以发送的数据size_t size_to_send = _lately_ackno+window_size-_next_seqno;while (size_to_send>0&&!_stream.buffer_empty()) {size_t size_to_read = std::min(size_to_send, TCPConfig::MAX_PAYLOAD_SIZE);// std::string data_to_send = _stream.read(_lately_ackno+window_size-_next_seqno);std::string data_to_send = _stream.read(size_to_read);size_to_send-=data_to_send.size();std::cout<<"****** size_to_read: "<<size_to_read<<" data_to_send.size(): "<<data_to_send.size();TCPSegment seg;seg.payload() = Buffer(std::move(data_to_send));// 如果stream读完了,且还有余下的空间放置FINif (_stream.eof()&&size_to_send>0) {// 这样的话循环条件一定不成立,所以size_to_send也没必要再-1了seg.header().fin=true;}send_segment(seg);std::cout<<" _segments_out.size(): "<<_segments_out.size()<<std::endl;if (_segments_out.size()) {std::cout<<_segments_out.front().payload().str()<<std::endl;}}}} else if (_stream.eof()&&_next_seqno<_stream.bytes_written()+2) { // 状态3 SYN_ACKED and FIN not Sent// 这种状态下一定只剩下1个FIN没放进TCPSegment的队列// 接收窗口也可能是0吧?size_t window_size=_window_size==0?1:_window_size; // fixedif (_lately_ackno+window_size>_next_seqno) {TCPSegment seg;seg.header().fin=true;send_segment(seg);}} else if (_stream.eof()&&_next_seqno==_stream.bytes_written()+2&&bytes_in_flight()>0) { // 状态4 FIN_SENT// 这里不用发送数据} else if (_stream.eof()&&_next_seqno==_stream.bytes_written()+2&&bytes_in_flight()==0) { // 状态5 FIN_ACKED// 这里不用发送数据} else { // 应该不会到这个分支}}//! \param ackno The remote receiver's ackno (acknowledgment number)//! \param window_size The remote receiver's advertised window sizevoid TCPSender::ack_received(const WrappingInt32 ackno, const uint16_t window_size) {size_t _ackno=unwrap(ackno, _isn, _lately_ackno);std::cout<<"***** _ackno: "<<_ackno<<" _lately_ackno: "<<_lately_ackno<<std::endl;if (_ackno>=_lately_ackno) {// if (_ackno>_lately_ackno) { // 这样的话,部分确认也会重置RTO和清零连续重传次数// _consecutive_retransmission=0;// _retransmission_timeout=_initial_retransmission_timeout;// }_lately_ackno=_ackno;_window_size=window_size; // 这里window_size可能是0}bool seg_acked=false;while (_segments_outstanding.size()) {TCPSegment seg = _segments_outstanding.front();size_t abs_seqno = unwrap(seg.header().seqno, _isn, _lately_ackno);if (abs_seqno+seg.length_in_sequence_space()<=_lately_ackno) {seg_acked=true;_segments_outstanding.pop();_bytes_in_flight-=seg.length_in_sequence_space(); // fixed} else {break;}}// 我认为这里_timer_running一定是trueif (_segments_outstanding.empty()) {_timer_running=false; // 关闭定时器} else if (seg_acked) { // 重启定时器_timer_running=true;_timer_ms=0;}// 这样的话,部分确认不会重置RTO和清零连续重传次数,也不会调用fill_window()if (seg_acked) {_consecutive_retransmission=0;_retransmission_timeout=_initial_retransmission_timeout;fill_window(); // fixed}// 如果接收窗口有了新的空间,TCPSender应该再次填充该窗口。// todo}//! \param[in] ms_since_last_tick the number of milliseconds since the last call to this methodvoid TCPSender::tick(const size_t ms_since_last_tick) {// 目前没有考虑窗口大小为0的情况if (_timer_running) {_timer_ms+=ms_since_last_tick;if (_timer_ms>=_retransmission_timeout) { // 定时器超时,需要重传最早的未被确认的报文段_timer_ms=0; // 重启定时器// if (_window_size>0||(_next_seqno>0&&bytes_in_flight()==_next_seqno)) { // fixedif (_window_size>0) { // fixed_consecutive_retransmission++; // 重传次数加一_retransmission_timeout<<=1; // RTO加倍}if (_segments_outstanding.size()) { // 这个判断应该一定是成立的吧?_segments_out.push(_segments_outstanding.front());}}}}unsigned int TCPSender::consecutive_retransmissions() const {return _consecutive_retransmission;}// 不带SYN、FIN,且不包含数据,只需要设置正确的seqnovoid TCPSender::send_empty_segment() {TCPSegment seg;seg.header().seqno=wrap(_next_seqno, _isn);_segments_out.push(seg);// 不用加入_segments_outstanding,也不用启动定时器}void TCPSender::send_segment(TCPSegment &seg) {seg.header().seqno=wrap(_next_seqno, _isn);_next_seqno+=seg.length_in_sequence_space();_bytes_in_flight+=seg.length_in_sequence_space();_segments_out.push(seg);_segments_outstanding.push(seg);// 每次发送报文段都要检查定时器,如果未启动则启动if (!_timer_running) {_timer_ms=0;_timer_running=true;}}
测试结果

第三阶段
说明
- 引入Lab4的代码后,新增了测试,Lab3又有样例没过,经过DEBUG,发现是少考虑一种特殊情况
- Impossible ackno (beyond next seqno) is ignored
- 只需要在ack_received方法前加入相关判断即可
代码
#ifndef SPONGE_LIBSPONGE_TCP_SENDER_HH#define SPONGE_LIBSPONGE_TCP_SENDER_HH#include "byte_stream.hh"#include "tcp_config.hh"#include "tcp_segment.hh"#include "wrapping_integers.hh"#include <functional>#include <queue>//! \brief The "sender" part of a TCP implementation.//! Accepts a ByteStream, divides it up into segments and sends the//! segments, keeps track of which segments are still in-flight,//! maintains the Retransmission Timer, and retransmits in-flight//! segments if the retransmission timer expires.class TCPSender {private:// 发送方随机选择的ISN//! our initial sequence number, the number for our SYN.WrappingInt32 _isn;// 把需要发送的报文段放到这个队列//! outbound queue of segments that the TCPSender wants sentstd::queue<TCPSegment> _segments_out{};// 这个变量保存的是RTO的初始值//! retransmission timer for the connectionunsigned int _initial_retransmission_timeout;// 还没有发送的数据//! outgoing stream of bytes that have not yet been sentByteStream _stream;// 未发送数据的第一个字节的绝对序号,包括SYN和FIN//! the (absolute) sequence number for the next byte to be sentuint64_t _next_seqno{0};size_t _lately_ackno = 0; // 从接收方返回的最大的acknosize_t _window_size = 0; // 和_lately_ackno配套的window sizesize_t _bytes_in_flight = 0; // 是_segments_outstanding中的序号空间大小std::queue<TCPSegment> _segments_outstanding{}; // 已发送但未确认的报文段size_t _retransmission_timeout = 0; // RTOsize_t _consecutive_retransmission = 0; // 连续重传次数size_t _timer_ms = 0; // 计时器启动后经过的毫秒数bool _timer_running = false; // 计时器是否启动void send_segment(TCPSegment &seg); // 发送已经填写了必要信息的报文段,这个方法内部只会再向TCPSegment中填入seqnopublic://! Initialize a TCPSender// 构造函数的参数:容量、初始超时时间(RTO的初始值)、ISNTCPSender(const size_t capacity = TCPConfig::DEFAULT_CAPACITY,const uint16_t retx_timeout = TCPConfig::TIMEOUT_DFLT,const std::optional<WrappingInt32> fixed_isn = {});//! \name "Input" interface for the writer//!@{ByteStream &stream_in() { return _stream; }const ByteStream &stream_in() const { return _stream; }//!@}//! \name Methods that can cause the TCPSender to send a segment//!@{//! \brief A new acknowledgment was receivedvoid ack_received(const WrappingInt32 ackno, const uint16_t window_size);//! \brief Generate an empty-payload segment (useful for creating empty ACK segments)void send_empty_segment();//! \brief create and send segments to fill as much of the window as possiblevoid fill_window();// 每隔几毫秒,您的TCPSender的tick方法将被调用,并带有一个参数,该参数指示自上次调用该方法以来已经过了几毫秒//! \brief Notifies the TCPSender of the passage of timevoid tick(const size_t ms_since_last_tick);//!@}//! \name Accessors//!@{// 已发送但未确认的字节数,SYN和FIN各占1字节//! \brief How many sequence numbers are occupied by segments sent but not yet acknowledged?//! \note count is in "sequence space," i.e. SYN and FIN each count for one byte//! (see TCPSegment::length_in_sequence_space())size_t bytes_in_flight() const;// 连续重传次数//! \brief Number of consecutive retransmissions that have occurred in a rowunsigned int consecutive_retransmissions() const;//! \brief TCPSegments that the TCPSender has enqueued for transmission.//! \note These must be dequeued and sent by the TCPConnection,//! which will need to fill in the fields that are set by the TCPReceiver//! (ackno and window size) before sending.std::queue<TCPSegment> &segments_out() { return _segments_out; }//!@}//! \name What is the next sequence number? (used for testing)//!@{//! \brief absolute seqno for the next byte to be sentuint64_t next_seqno_absolute() const { return _next_seqno; }//! \brief relative seqno for the next byte to be sentWrappingInt32 next_seqno() const { return wrap(_next_seqno, _isn); }//!@}};#endif // SPONGE_LIBSPONGE_TCP_SENDER_HH#include "tcp_sender.hh"#include "tcp_config.hh"#include <random>#include <iostream>#include <cmath>// Dummy implementation of a TCP sender// For Lab 3, please replace with a real implementation that passes the// automated checks run by `make check_lab3`.template <typename... Targs>void DUMMY_CODE(Targs &&... /* unused */) {}using namespace std;//! \param[in] capacity the capacity of the outgoing byte stream//! \param[in] retx_timeout the initial amount of time to wait before retransmitting the oldest outstanding segment//! \param[in] fixed_isn the Initial Sequence Number to use, if set (otherwise uses a random ISN)TCPSender::TCPSender(const size_t capacity, const uint16_t retx_timeout, const std::optional<WrappingInt32> fixed_isn): _isn(fixed_isn.value_or(WrappingInt32{random_device()()})), _initial_retransmission_timeout{retx_timeout}, _stream(capacity),_lately_ackno(0), _window_size(1) // fixed,_bytes_in_flight(0), _segments_outstanding(),_retransmission_timeout{retx_timeout}, _consecutive_retransmission(0),_timer_ms(0), _timer_running(false) {}// _segments_outstanding中所有报文段的和,包括SYN和FINuint64_t TCPSender::bytes_in_flight() const {return _bytes_in_flight;}void TCPSender::fill_window() {if (_next_seqno==0) { // 状态0 CLOSED// 发送不带数据的SYN报文TCPSegment seg;seg.header().syn=true;send_segment(seg);std::cout<<"~~~~~~~~~~~~~~~~~~~~~~~~ start with RTO: "<<_retransmission_timeout<<std::endl;} else if (_next_seqno>0&&bytes_in_flight()==_next_seqno) { // 状态1 SYN_SENT// 在未收到任何ACK之前,假设接收方的窗口大小为1,所以这里不能再发送任何数据} else if (!_stream.eof()&&bytes_in_flight()<_next_seqno) { // 状态2 SYN_ACKED/* 考察区间[_next_seqno, _lately_ackno+_window_size)如果区间不存在,则不能发送新的数据如果window_size==0,则可以发送1个字节的数据*/size_t window_size=_window_size==0?1:_window_size; // fixedif (_lately_ackno+window_size>_next_seqno&&!_stream.buffer_empty()) { // fixed// 注意MAX_PAYLOAD_SIZE=1452字节 fixed// 有可以发送的数据size_t size_to_send = _lately_ackno+window_size-_next_seqno;while (size_to_send>0&&!_stream.buffer_empty()) {size_t size_to_read = std::min(size_to_send, TCPConfig::MAX_PAYLOAD_SIZE);// std::string data_to_send = _stream.read(_lately_ackno+window_size-_next_seqno);std::string data_to_send = _stream.read(size_to_read);size_to_send-=data_to_send.size();std::cout<<"****** size_to_read: "<<size_to_read<<" data_to_send.size(): "<<data_to_send.size();TCPSegment seg;seg.payload() = Buffer(std::move(data_to_send));// 如果stream读完了,且还有余下的空间放置FINif (_stream.eof()&&size_to_send>0) {// 这样的话循环条件一定不成立,所以size_to_send也没必要再-1了seg.header().fin=true;}send_segment(seg);std::cout<<" _segments_out.size(): "<<_segments_out.size()<<std::endl;if (_segments_out.size()) {std::cout<<_segments_out.front().payload().str()<<std::endl;}}}} else if (_stream.eof()&&_next_seqno<_stream.bytes_written()+2) { // 状态3 SYN_ACKED and FIN not Sent// 这种状态下一定只剩下1个FIN没放进TCPSegment的队列// 接收窗口也可能是0吧?size_t window_size=_window_size==0?1:_window_size; // fixedif (_lately_ackno+window_size>_next_seqno) {TCPSegment seg;seg.header().fin=true;send_segment(seg);}} else if (_stream.eof()&&_next_seqno==_stream.bytes_written()+2&&bytes_in_flight()>0) { // 状态4 FIN_SENT// 这里不用发送数据} else if (_stream.eof()&&_next_seqno==_stream.bytes_written()+2&&bytes_in_flight()==0) { // 状态5 FIN_ACKED// 这里不用发送数据} else { // 应该不会到这个分支}}//! \param ackno The remote receiver's ackno (acknowledgment number)//! \param window_size The remote receiver's advertised window sizevoid TCPSender::ack_received(const WrappingInt32 ackno, const uint16_t window_size) {size_t _ackno=unwrap(ackno, _isn, _lately_ackno);// Impossible ackno (beyond next seqno) is ignoredif (_ackno>_next_seqno) {return;}std::cout<<"***** _ackno: "<<_ackno<<" _lately_ackno: "<<_lately_ackno<<std::endl;if (_ackno>=_lately_ackno) {// if (_ackno>_lately_ackno) { // 这样的话,部分确认也会重置RTO和清零连续重传次数// _consecutive_retransmission=0;// _retransmission_timeout=_initial_retransmission_timeout;// }_lately_ackno=_ackno;_window_size=window_size; // 这里window_size可能是0}bool seg_acked=false;while (_segments_outstanding.size()) {TCPSegment seg = _segments_outstanding.front();size_t abs_seqno = unwrap(seg.header().seqno, _isn, _lately_ackno);if (abs_seqno+seg.length_in_sequence_space()<=_lately_ackno) {seg_acked=true;_segments_outstanding.pop();_bytes_in_flight-=seg.length_in_sequence_space(); // fixed} else {break;}}// 我认为这里_timer_running一定是trueif (_segments_outstanding.empty()) {_timer_running=false; // 关闭定时器} else if (seg_acked) { // 重启定时器_timer_running=true;_timer_ms=0;}// 这样的话,部分确认不会重置RTO和清零连续重传次数,也不会调用fill_window()if (seg_acked) {_consecutive_retransmission=0;_retransmission_timeout=_initial_retransmission_timeout;fill_window(); // fixed}// 如果接收窗口有了新的空间,TCPSender应该再次填充该窗口。// todo}//! \param[in] ms_since_last_tick the number of milliseconds since the last call to this methodvoid TCPSender::tick(const size_t ms_since_last_tick) {// 目前没有考虑窗口大小为0的情况if (_timer_running) {_timer_ms+=ms_since_last_tick;if (_timer_ms>=_retransmission_timeout) { // 定时器超时,需要重传最早的未被确认的报文段_timer_ms=0; // 重启定时器// if (_window_size>0||(_next_seqno>0&&bytes_in_flight()==_next_seqno)) { // fixedif (_window_size>0) { // fixed_consecutive_retransmission++; // 重传次数加一_retransmission_timeout<<=1; // RTO加倍}if (_segments_outstanding.size()) { // 这个判断应该一定是成立的吧?_segments_out.push(_segments_outstanding.front());}}}}unsigned int TCPSender::consecutive_retransmissions() const {return _consecutive_retransmission;}// 不带SYN、FIN,且不包含数据,只需要设置正确的seqnovoid TCPSender::send_empty_segment() {TCPSegment seg;seg.header().seqno=wrap(_next_seqno, _isn);_segments_out.push(seg);// 不用加入_segments_outstanding,也不用启动定时器}void TCPSender::send_segment(TCPSegment &seg) {seg.header().seqno=wrap(_next_seqno, _isn);_next_seqno+=seg.length_in_sequence_space();_bytes_in_flight+=seg.length_in_sequence_space();_segments_out.push(seg);_segments_outstanding.push(seg);// 每次发送报文段都要检查定时器,如果未启动则启动if (!_timer_running) {_timer_ms=0;_timer_running=true;}}
