BLOCKING

C

muduo/blob/master/examples/ace/ttcp/ttcp_blocking.cc
muduo/blob/master/examples/ace/ttcp/common.h

  • 服务端 ```c

    pragma once

include

include

struct Options { uint16_t port; int length; int number; bool transmit, receive, nodelay; std::string host; Options() : port(0), length(0), number(0), transmit(false), receive(false), nodelay(false) { } };

bool parseCommandLine(int argc, char argv[], Options opt); struct sockaddr_in resolveOrDie(const char* host, uint16_t port);

struct SessionMessage { int32t number; int32t length; } attribute ((__packed)); // 取消字节对齐, 紧凑排列

struct PayloadMessage { int32_t length; char data[0]; // C语言中: 在结构体的最后一个元素的数组大小是运行时决定的 };

void transmit(const Options& opt);

void receive(const Options& opt);

  1. ```c
  2. #include "examples/ace/ttcp/common.h"
  3. #include "muduo/base/Timestamp.h"
  4. #include "muduo/base/Types.h"
  5. #undef NDEBUG
  6. #include <assert.h>
  7. #include <errno.h>
  8. #include <stdio.h>
  9. #include <unistd.h>
  10. #include <netinet/in.h>
  11. #include <arpa/inet.h>
  12. static int acceptOrDie(uint16_t port)
  13. {
  14. int listenfd = ::socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
  15. assert(listenfd >= 0);
  16. int yes = 1;
  17. if (setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes)))
  18. {
  19. perror("setsockopt");
  20. exit(1);
  21. }
  22. struct sockaddr_in addr;
  23. muduo::memZero(&addr, sizeof(addr));
  24. addr.sin_family = AF_INET;
  25. addr.sin_port = htons(port);
  26. addr.sin_addr.s_addr = INADDR_ANY;
  27. if (bind(listenfd, reinterpret_cast<struct sockaddr*>(&addr), sizeof(addr)))
  28. {
  29. perror("bind");
  30. exit(1);
  31. }
  32. if (listen(listenfd, 5))
  33. {
  34. perror("listen");
  35. exit(1);
  36. }
  37. struct sockaddr_in peer_addr;
  38. muduo::memZero(&peer_addr, sizeof(peer_addr));
  39. socklen_t addrlen = 0;
  40. int sockfd = ::accept(listenfd, reinterpret_cast<struct sockaddr*>(&peer_addr), &addrlen);
  41. if (sockfd < 0)
  42. {
  43. perror("accept");
  44. exit(1);
  45. }
  46. ::close(listenfd);
  47. return sockfd;
  48. }
  49. static int write_n(int sockfd, const void* buf, int length)
  50. {
  51. int written = 0;
  52. while (written < length)
  53. {
  54. ssize_t nw = ::write(sockfd, static_cast<const char*>(buf) + written, length - written);
  55. if (nw > 0)
  56. {
  57. written += static_cast<int>(nw);
  58. }
  59. else if (nw == 0)
  60. {
  61. break; // EOF
  62. }
  63. else if (errno != EINTR)
  64. {
  65. perror("write");
  66. break;
  67. }
  68. }
  69. return written;
  70. }
  71. static int read_n(int sockfd, void* buf, int length)
  72. {
  73. int nread = 0;
  74. while (nread < length)
  75. {
  76. ssize_t nr = ::read(sockfd, static_cast<char*>(buf) + nread, length - nread);
  77. if (nr > 0)
  78. {
  79. nread += static_cast<int>(nr);
  80. }
  81. else if (nr == 0)
  82. {
  83. break; // EOF
  84. }
  85. else if (errno != EINTR)
  86. {
  87. perror("read");
  88. break;
  89. }
  90. }
  91. return nread;
  92. }
  93. void transmit(const Options& opt)
  94. {
  95. struct sockaddr_in addr = resolveOrDie(opt.host.c_str(), opt.port);
  96. printf("connecting to %s:%d\n", inet_ntoa(addr.sin_addr), opt.port);
  97. int sockfd = ::socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
  98. assert(sockfd >= 0);
  99. int ret = ::connect(sockfd, reinterpret_cast<struct sockaddr*>(&addr), sizeof(addr));
  100. if (ret)
  101. {
  102. perror("connect");
  103. printf("Unable to connect %s\n", opt.host.c_str());
  104. ::close(sockfd);
  105. return;
  106. }
  107. printf("connected\n");
  108. muduo::Timestamp start(muduo::Timestamp::now());
  109. struct SessionMessage sessionMessage = { 0, 0 };
  110. sessionMessage.number = htonl(opt.number);
  111. sessionMessage.length = htonl(opt.length);
  112. if (write_n(sockfd, &sessionMessage, sizeof(sessionMessage)) != sizeof(sessionMessage))
  113. {
  114. perror("write SessionMessage");
  115. exit(1);
  116. }
  117. const int total_len = static_cast<int>(sizeof(int32_t) + opt.length);
  118. PayloadMessage* payload = static_cast<PayloadMessage*>(::malloc(total_len));
  119. assert(payload);
  120. payload->length = htonl(opt.length);
  121. for (int i = 0; i < opt.length; ++i)
  122. {
  123. payload->data[i] = "0123456789ABCDEF"[i % 16];
  124. }
  125. double total_mb = 1.0 * opt.length * opt.number / 1024 / 1024;
  126. printf("%.3f MiB in total\n", total_mb);
  127. for (int i = 0; i < opt.number; ++i)
  128. {
  129. int nw = write_n(sockfd, payload, total_len);
  130. assert(nw == total_len);
  131. int ack = 0;
  132. int nr = read_n(sockfd, &ack, sizeof(ack));
  133. assert(nr == sizeof(ack));
  134. ack = ntohl(ack);
  135. assert(ack == opt.length);
  136. }
  137. ::free(payload);
  138. ::close(sockfd);
  139. double elapsed = timeDifference(muduo::Timestamp::now(), start);
  140. printf("%.3f seconds\n%.3f MiB/s\n", elapsed, total_mb / elapsed);
  141. }
  142. void receive(const Options& opt)
  143. {
  144. // 接受连接
  145. int sockfd = acceptOrDie(opt.port);
  146. struct SessionMessage sessionMessage = { 0, 0 };
  147. // 读取8个字节长度
  148. if (read_n(sockfd, &sessionMessage, sizeof(sessionMessage)) != sizeof(sessionMessage))
  149. {
  150. perror("read SessionMessage");
  151. exit(1);
  152. }
  153. // 网络字节序转为本机字节序
  154. sessionMessage.number = ntohl(sessionMessage.number);
  155. sessionMessage.length = ntohl(sessionMessage.length);
  156. printf("receive number = %d\nreceive length = %d\n",
  157. sessionMessage.number, sessionMessage.length);
  158. // 缓冲区: 4个字节 + 消息的长度
  159. const int total_len = static_cast<int>(sizeof(int32_t) + sessionMessage.length);
  160. // 如果是外网使用: 接受到超长的消息, malloc会分配很大的内存, 可能造成DDos攻击; 可以通过限制length的长度来防止DDos攻击
  161. PayloadMessage* payload = static_cast<PayloadMessage*>(::malloc(total_len)); // 不定长消息
  162. assert(payload);
  163. for (int i = 0; i < sessionMessage.number; ++i)
  164. {
  165. payload->length = 0;
  166. if (read_n(sockfd, &payload->length, sizeof(payload->length)) != sizeof(payload->length))
  167. {
  168. perror("read length");
  169. exit(1);
  170. }
  171. payload->length = ntohl(payload->length);
  172. assert(payload->length == sessionMessage.length); // 验证消息长度
  173. // 读取数据
  174. if (read_n(sockfd, payload->data, payload->length) != payload->length)
  175. {
  176. perror("read payload data");
  177. exit(1);
  178. }
  179. // 构造响应
  180. int32_t ack = htonl(payload->length);
  181. if (write_n(sockfd, &ack, sizeof(ack)) != sizeof(ack))
  182. {
  183. perror("write ack");
  184. exit(1);
  185. }
  186. }
  187. ::free(payload);
  188. ::close(sockfd);
  189. }

[

](https://github.com/chenshuo/muduo/blob/master/examples/ace/ttcp/common.h)

NON-BLOCKING

C++

https://github.com/chenshuo/recipes/blob/master/tpc/bin/ttcp.cc

  1. #include "Acceptor.h"
  2. #include "InetAddress.h"
  3. #include "TcpStream.h"
  4. #include <iostream>
  5. #include <boost/program_options.hpp>
  6. #include <sys/time.h>
  7. struct Options
  8. {
  9. uint16_t port;
  10. int length;
  11. int number;
  12. bool transmit, receive, nodelay;
  13. std::string host;
  14. Options()
  15. : port(0), length(0), number(0),
  16. transmit(false), receive(false), nodelay(false)
  17. {
  18. }
  19. };
  20. struct SessionMessage
  21. {
  22. int32_t number;
  23. int32_t length;
  24. } __attribute__ ((__packed__));
  25. struct PayloadMessage
  26. {
  27. int32_t length;
  28. char data[0];
  29. };
  30. double now()
  31. {
  32. struct timeval tv = { 0, 0 };
  33. gettimeofday(&tv, NULL);
  34. return tv.tv_sec + tv.tv_usec / 1000000.0;
  35. }
  36. // FIXME: rewrite with getopt(3).
  37. bool parseCommandLine(int argc, char* argv[], Options* opt)
  38. {
  39. namespace po = boost::program_options;
  40. po::options_description desc("Allowed options");
  41. desc.add_options()
  42. ("help,h", "Help")
  43. ("port,p", po::value<uint16_t>(&opt->port)->default_value(5001), "TCP port")
  44. ("length,l", po::value<int>(&opt->length)->default_value(65536), "Buffer length")
  45. ("number,n", po::value<int>(&opt->number)->default_value(8192), "Number of buffers")
  46. ("trans,t", po::value<std::string>(&opt->host), "Transmit")
  47. ("recv,r", "Receive")
  48. ("nodelay,D", "set TCP_NODELAY")
  49. ;
  50. po::variables_map vm;
  51. po::store(po::parse_command_line(argc, argv, desc), vm);
  52. po::notify(vm);
  53. opt->transmit = vm.count("trans");
  54. opt->receive = vm.count("recv");
  55. opt->nodelay = vm.count("nodelay");
  56. if (vm.count("help"))
  57. {
  58. std::cout << desc << std::endl;
  59. return false;
  60. }
  61. if (opt->transmit == opt->receive)
  62. {
  63. printf("either -t or -r must be specified.\n");
  64. return false;
  65. }
  66. printf("port = %d\n", opt->port);
  67. if (opt->transmit)
  68. {
  69. printf("buffer length = %d\n", opt->length);
  70. printf("number of buffers = %d\n", opt->number);
  71. }
  72. else
  73. {
  74. printf("accepting...\n");
  75. }
  76. return true;
  77. }
  78. void transmit(const Options& opt)
  79. {
  80. InetAddress addr();
  81. if (!InetAddress::resolve(opt.host.c_str(), opt.port, &addr))
  82. {
  83. printf("Unable to resolve %s\n", opt.host.c_str());
  84. return;
  85. }
  86. printf("connecting to %s\n", addr.toIpPort().c_str());
  87. TcpStreamPtr stream(TcpStream::connect(addr));
  88. if (!stream)
  89. {
  90. printf("Unable to connect %s\n", addr.toIpPort().c_str());
  91. perror("");
  92. return;
  93. }
  94. if (opt.nodelay)
  95. {
  96. stream->setTcpNoDelay(true);
  97. }
  98. printf("connected\n");
  99. double start = now();
  100. struct SessionMessage sessionMessage = { 0, 0 };
  101. sessionMessage.number = htonl(opt.number);
  102. sessionMessage.length = htonl(opt.length);
  103. if (stream->sendAll(&sessionMessage, sizeof(sessionMessage)) != sizeof(sessionMessage))
  104. {
  105. perror("write SessionMessage");
  106. return;
  107. }
  108. const int total_len = sizeof(int32_t) + opt.length;
  109. PayloadMessage* payload = static_cast<PayloadMessage*>(::malloc(total_len));
  110. std::unique_ptr<PayloadMessage, void (*)(void*)> freeIt(payload, ::free);
  111. assert(payload);
  112. payload->length = htonl(opt.length);
  113. for (int i = 0; i < opt.length; ++i)
  114. {
  115. payload->data[i] = "0123456789ABCDEF"[i % 16];
  116. }
  117. double total_mb = 1.0 * opt.length * opt.number / 1024 / 1024;
  118. printf("%.3f MiB in total\n", total_mb);
  119. for (int i = 0; i < opt.number; ++i)
  120. {
  121. int nw = stream->sendAll(payload, total_len);
  122. assert(nw == total_len);
  123. int ack = 0;
  124. int nr = stream->receiveAll(&ack, sizeof(ack));
  125. assert(nr == sizeof(ack));
  126. ack = ntohl(ack);
  127. assert(ack == opt.length);
  128. }
  129. double elapsed = now() - start;
  130. printf("%.3f seconds\n%.3f MiB/s\n", elapsed, total_mb / elapsed);
  131. }
  132. void receive(const Options& opt)
  133. {
  134. Acceptor acceptor(InetAddress(opt.port));
  135. TcpStreamPtr stream(acceptor.accept());
  136. if (!stream)
  137. {
  138. return;
  139. }
  140. struct SessionMessage sessionMessage = { 0, 0 };
  141. if (stream->receiveAll(&sessionMessage, sizeof(sessionMessage)) != sizeof(sessionMessage))
  142. {
  143. perror("read SessionMessage");
  144. return;
  145. }
  146. sessionMessage.number = ntohl(sessionMessage.number);
  147. sessionMessage.length = ntohl(sessionMessage.length);
  148. printf("receive buffer length = %d\nreceive number of buffers = %d\n",
  149. sessionMessage.length, sessionMessage.number);
  150. double total_mb = 1.0 * sessionMessage.number * sessionMessage.length / 1024 / 1024;
  151. printf("%.3f MiB in total\n", total_mb);
  152. const int total_len = sizeof(int32_t) + sessionMessage.length;
  153. PayloadMessage* payload = static_cast<PayloadMessage*>(::malloc(total_len));
  154. std::unique_ptr<PayloadMessage, void (*)(void*)> freeIt(payload, ::free);
  155. assert(payload);
  156. double start = now();
  157. for (int i = 0; i < sessionMessage.number; ++i)
  158. {
  159. payload->length = 0;
  160. if (stream->receiveAll(&payload->length, sizeof(payload->length)) != sizeof(payload->length))
  161. {
  162. perror("read length");
  163. return;
  164. }
  165. payload->length = ntohl(payload->length);
  166. assert(payload->length == sessionMessage.length);
  167. if (stream->receiveAll(payload->data, payload->length) != payload->length)
  168. {
  169. perror("read payload data");
  170. return;
  171. }
  172. int32_t ack = htonl(payload->length);
  173. if (stream->sendAll(&ack, sizeof(ack)) != sizeof(ack))
  174. {
  175. perror("write ack");
  176. return;
  177. }
  178. }
  179. double elapsed = now() - start;
  180. printf("%.3f seconds\n%.3f MiB/s\n", elapsed, total_mb / elapsed);
  181. }
  182. int main(int argc, char* argv[])
  183. {
  184. Options options;
  185. if (parseCommandLine(argc, argv, &options))
  186. {
  187. if (options.transmit)
  188. {
  189. transmit(options);
  190. }
  191. else if (options.receive)
  192. {
  193. receive(options);
  194. }
  195. else
  196. {
  197. assert(0);
  198. }
  199. }
  200. }

[

](https://github.com/chenshuo/recipes/blob/master/tpc/bin/ttcp.cc)