BLOCKING
C
muduo/blob/master/examples/ace/ttcp/ttcp_blocking.cc
muduo/blob/master/examples/ace/ttcp/common.h
include
include
struct Options { uint16_t port; int length; int number; bool transmit, receive, nodelay; std::string host; Options() : port(0), length(0), number(0), transmit(false), receive(false), nodelay(false) { } };
bool parseCommandLine(int argc, char argv[], Options opt); struct sockaddr_in resolveOrDie(const char* host, uint16_t port);
struct SessionMessage { int32t number; int32t length; } attribute ((__packed)); // 取消字节对齐, 紧凑排列
struct PayloadMessage { int32_t length; char data[0]; // C语言中: 在结构体的最后一个元素的数组大小是运行时决定的 };
void transmit(const Options& opt);
void receive(const Options& opt);
```c#include "examples/ace/ttcp/common.h"#include "muduo/base/Timestamp.h"#include "muduo/base/Types.h"#undef NDEBUG#include <assert.h>#include <errno.h>#include <stdio.h>#include <unistd.h>#include <netinet/in.h>#include <arpa/inet.h>static int acceptOrDie(uint16_t port){int listenfd = ::socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);assert(listenfd >= 0);int yes = 1;if (setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes))){perror("setsockopt");exit(1);}struct sockaddr_in addr;muduo::memZero(&addr, sizeof(addr));addr.sin_family = AF_INET;addr.sin_port = htons(port);addr.sin_addr.s_addr = INADDR_ANY;if (bind(listenfd, reinterpret_cast<struct sockaddr*>(&addr), sizeof(addr))){perror("bind");exit(1);}if (listen(listenfd, 5)){perror("listen");exit(1);}struct sockaddr_in peer_addr;muduo::memZero(&peer_addr, sizeof(peer_addr));socklen_t addrlen = 0;int sockfd = ::accept(listenfd, reinterpret_cast<struct sockaddr*>(&peer_addr), &addrlen);if (sockfd < 0){perror("accept");exit(1);}::close(listenfd);return sockfd;}static int write_n(int sockfd, const void* buf, int length){int written = 0;while (written < length){ssize_t nw = ::write(sockfd, static_cast<const char*>(buf) + written, length - written);if (nw > 0){written += static_cast<int>(nw);}else if (nw == 0){break; // EOF}else if (errno != EINTR){perror("write");break;}}return written;}static int read_n(int sockfd, void* buf, int length){int nread = 0;while (nread < length){ssize_t nr = ::read(sockfd, static_cast<char*>(buf) + nread, length - nread);if (nr > 0){nread += static_cast<int>(nr);}else if (nr == 0){break; // EOF}else if (errno != EINTR){perror("read");break;}}return nread;}void transmit(const Options& opt){struct sockaddr_in addr = resolveOrDie(opt.host.c_str(), opt.port);printf("connecting to %s:%d\n", inet_ntoa(addr.sin_addr), opt.port);int sockfd = ::socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);assert(sockfd >= 0);int ret = ::connect(sockfd, reinterpret_cast<struct sockaddr*>(&addr), sizeof(addr));if (ret){perror("connect");printf("Unable to connect %s\n", opt.host.c_str());::close(sockfd);return;}printf("connected\n");muduo::Timestamp start(muduo::Timestamp::now());struct SessionMessage sessionMessage = { 0, 0 };sessionMessage.number = htonl(opt.number);sessionMessage.length = htonl(opt.length);if (write_n(sockfd, &sessionMessage, sizeof(sessionMessage)) != sizeof(sessionMessage)){perror("write SessionMessage");exit(1);}const int total_len = static_cast<int>(sizeof(int32_t) + opt.length);PayloadMessage* payload = static_cast<PayloadMessage*>(::malloc(total_len));assert(payload);payload->length = htonl(opt.length);for (int i = 0; i < opt.length; ++i){payload->data[i] = "0123456789ABCDEF"[i % 16];}double total_mb = 1.0 * opt.length * opt.number / 1024 / 1024;printf("%.3f MiB in total\n", total_mb);for (int i = 0; i < opt.number; ++i){int nw = write_n(sockfd, payload, total_len);assert(nw == total_len);int ack = 0;int nr = read_n(sockfd, &ack, sizeof(ack));assert(nr == sizeof(ack));ack = ntohl(ack);assert(ack == opt.length);}::free(payload);::close(sockfd);double elapsed = timeDifference(muduo::Timestamp::now(), start);printf("%.3f seconds\n%.3f MiB/s\n", elapsed, total_mb / elapsed);}void receive(const Options& opt){// 接受连接int sockfd = acceptOrDie(opt.port);struct SessionMessage sessionMessage = { 0, 0 };// 读取8个字节长度if (read_n(sockfd, &sessionMessage, sizeof(sessionMessage)) != sizeof(sessionMessage)){perror("read SessionMessage");exit(1);}// 网络字节序转为本机字节序sessionMessage.number = ntohl(sessionMessage.number);sessionMessage.length = ntohl(sessionMessage.length);printf("receive number = %d\nreceive length = %d\n",sessionMessage.number, sessionMessage.length);// 缓冲区: 4个字节 + 消息的长度const int total_len = static_cast<int>(sizeof(int32_t) + sessionMessage.length);// 如果是外网使用: 接受到超长的消息, malloc会分配很大的内存, 可能造成DDos攻击; 可以通过限制length的长度来防止DDos攻击PayloadMessage* payload = static_cast<PayloadMessage*>(::malloc(total_len)); // 不定长消息assert(payload);for (int i = 0; i < sessionMessage.number; ++i){payload->length = 0;if (read_n(sockfd, &payload->length, sizeof(payload->length)) != sizeof(payload->length)){perror("read length");exit(1);}payload->length = ntohl(payload->length);assert(payload->length == sessionMessage.length); // 验证消息长度// 读取数据if (read_n(sockfd, payload->data, payload->length) != payload->length){perror("read payload data");exit(1);}// 构造响应int32_t ack = htonl(payload->length);if (write_n(sockfd, &ack, sizeof(ack)) != sizeof(ack)){perror("write ack");exit(1);}}::free(payload);::close(sockfd);}
[
](https://github.com/chenshuo/muduo/blob/master/examples/ace/ttcp/common.h)
NON-BLOCKING
C++
https://github.com/chenshuo/recipes/blob/master/tpc/bin/ttcp.cc
#include "Acceptor.h"#include "InetAddress.h"#include "TcpStream.h"#include <iostream>#include <boost/program_options.hpp>#include <sys/time.h>struct Options{uint16_t port;int length;int number;bool transmit, receive, nodelay;std::string host;Options(): port(0), length(0), number(0),transmit(false), receive(false), nodelay(false){}};struct SessionMessage{int32_t number;int32_t length;} __attribute__ ((__packed__));struct PayloadMessage{int32_t length;char data[0];};double now(){struct timeval tv = { 0, 0 };gettimeofday(&tv, NULL);return tv.tv_sec + tv.tv_usec / 1000000.0;}// FIXME: rewrite with getopt(3).bool parseCommandLine(int argc, char* argv[], Options* opt){namespace po = boost::program_options;po::options_description desc("Allowed options");desc.add_options()("help,h", "Help")("port,p", po::value<uint16_t>(&opt->port)->default_value(5001), "TCP port")("length,l", po::value<int>(&opt->length)->default_value(65536), "Buffer length")("number,n", po::value<int>(&opt->number)->default_value(8192), "Number of buffers")("trans,t", po::value<std::string>(&opt->host), "Transmit")("recv,r", "Receive")("nodelay,D", "set TCP_NODELAY");po::variables_map vm;po::store(po::parse_command_line(argc, argv, desc), vm);po::notify(vm);opt->transmit = vm.count("trans");opt->receive = vm.count("recv");opt->nodelay = vm.count("nodelay");if (vm.count("help")){std::cout << desc << std::endl;return false;}if (opt->transmit == opt->receive){printf("either -t or -r must be specified.\n");return false;}printf("port = %d\n", opt->port);if (opt->transmit){printf("buffer length = %d\n", opt->length);printf("number of buffers = %d\n", opt->number);}else{printf("accepting...\n");}return true;}void transmit(const Options& opt){InetAddress addr();if (!InetAddress::resolve(opt.host.c_str(), opt.port, &addr)){printf("Unable to resolve %s\n", opt.host.c_str());return;}printf("connecting to %s\n", addr.toIpPort().c_str());TcpStreamPtr stream(TcpStream::connect(addr));if (!stream){printf("Unable to connect %s\n", addr.toIpPort().c_str());perror("");return;}if (opt.nodelay){stream->setTcpNoDelay(true);}printf("connected\n");double start = now();struct SessionMessage sessionMessage = { 0, 0 };sessionMessage.number = htonl(opt.number);sessionMessage.length = htonl(opt.length);if (stream->sendAll(&sessionMessage, sizeof(sessionMessage)) != sizeof(sessionMessage)){perror("write SessionMessage");return;}const int total_len = sizeof(int32_t) + opt.length;PayloadMessage* payload = static_cast<PayloadMessage*>(::malloc(total_len));std::unique_ptr<PayloadMessage, void (*)(void*)> freeIt(payload, ::free);assert(payload);payload->length = htonl(opt.length);for (int i = 0; i < opt.length; ++i){payload->data[i] = "0123456789ABCDEF"[i % 16];}double total_mb = 1.0 * opt.length * opt.number / 1024 / 1024;printf("%.3f MiB in total\n", total_mb);for (int i = 0; i < opt.number; ++i){int nw = stream->sendAll(payload, total_len);assert(nw == total_len);int ack = 0;int nr = stream->receiveAll(&ack, sizeof(ack));assert(nr == sizeof(ack));ack = ntohl(ack);assert(ack == opt.length);}double elapsed = now() - start;printf("%.3f seconds\n%.3f MiB/s\n", elapsed, total_mb / elapsed);}void receive(const Options& opt){Acceptor acceptor(InetAddress(opt.port));TcpStreamPtr stream(acceptor.accept());if (!stream){return;}struct SessionMessage sessionMessage = { 0, 0 };if (stream->receiveAll(&sessionMessage, sizeof(sessionMessage)) != sizeof(sessionMessage)){perror("read SessionMessage");return;}sessionMessage.number = ntohl(sessionMessage.number);sessionMessage.length = ntohl(sessionMessage.length);printf("receive buffer length = %d\nreceive number of buffers = %d\n",sessionMessage.length, sessionMessage.number);double total_mb = 1.0 * sessionMessage.number * sessionMessage.length / 1024 / 1024;printf("%.3f MiB in total\n", total_mb);const int total_len = sizeof(int32_t) + sessionMessage.length;PayloadMessage* payload = static_cast<PayloadMessage*>(::malloc(total_len));std::unique_ptr<PayloadMessage, void (*)(void*)> freeIt(payload, ::free);assert(payload);double start = now();for (int i = 0; i < sessionMessage.number; ++i){payload->length = 0;if (stream->receiveAll(&payload->length, sizeof(payload->length)) != sizeof(payload->length)){perror("read length");return;}payload->length = ntohl(payload->length);assert(payload->length == sessionMessage.length);if (stream->receiveAll(payload->data, payload->length) != payload->length){perror("read payload data");return;}int32_t ack = htonl(payload->length);if (stream->sendAll(&ack, sizeof(ack)) != sizeof(ack)){perror("write ack");return;}}double elapsed = now() - start;printf("%.3f seconds\n%.3f MiB/s\n", elapsed, total_mb / elapsed);}int main(int argc, char* argv[]){Options options;if (parseCommandLine(argc, argv, &options)){if (options.transmit){transmit(options);}else if (options.receive){receive(options);}else{assert(0);}}}
[
](https://github.com/chenshuo/recipes/blob/master/tpc/bin/ttcp.cc)
