3) 内存管理与buffer封装

  1. 在完成网络框架之前,我们先把必须的内存管理和buffer的封装完成。

这里我们先创建一个io_buf类,主要用来封装基本的buffer结构。然后用一个buf_pool来管理全部的buffer集合。

3.1 io_buf 内存块

lars_reactor/include/io_buf.h

  1. #pragma once
  2. /*
  3. 定义一个 buffer存放数据的结构
  4. * */
  5. class io_buf {
  6. public:
  7. //构造,创建一个io_buf对象
  8. io_buf(int size);
  9. //清空数据
  10. void clear();
  11. //将已经处理过的数据,清空,将未处理的数据提前至数据首地址
  12. void adjust();
  13. //将其他io_buf对象数据考本到自己中
  14. void copy(const io_buf *other);
  15. //处理长度为len的数据,移动head和修正length
  16. void pop(int len);
  17. //如果存在多个buffer,是采用链表的形式链接起来
  18. io_buf *next;
  19. //当前buffer的缓存容量大小
  20. int capacity;
  21. //当前buffer有效数据长度
  22. int length;
  23. //未处理数据的头部位置索引
  24. int head;
  25. //当前io_buf所保存的数据地址
  26. char *data;
  27. };

对应的io_buf实现的文件,如下

lars_reactor/src/io_buf.cpp

  1. #include <stdio.h>
  2. #include <assert.h>
  3. #include <string.h>
  4. #include "io_buf.h"
  5. //构造,创建一个io_buf对象
  6. io_buf::io_buf(int size):
  7. capacity(size),
  8. length(0),
  9. head(0),
  10. next(NULL)
  11. {
  12. data = new char[size];
  13. assert(data);
  14. }
  15. //清空数据
  16. void io_buf::clear() {
  17. length = head = 0;
  18. }
  19. //将已经处理过的数据,清空,将未处理的数据提前至数据首地址
  20. void io_buf::adjust() {
  21. if (head != 0) {
  22. if (length != 0) {
  23. memmove(data, data+head, length);
  24. }
  25. head = 0;
  26. }
  27. }
  28. //将其他io_buf对象数据考本到自己中
  29. void io_buf::copy(const io_buf *other) {
  30. memcpy(data, other->data + other->head, other->length);
  31. head = 0;
  32. length = other->length;
  33. }
  34. //处理长度为len的数据,移动head和修正length
  35. void io_buf::pop(int len) {
  36. length -= len;
  37. head += len;
  38. }
  1. 这里主要要注意io_buf的两个索引值lengthhead,一个是当前buffer的有效内存长度,haed则为可用的有效长度首数据位置。 capacityio_buf的总容量空间大小。
  2. 所以每次`pop()`则是弹出已经处理了多少,那么buffer剩下的内存就接下来需要处理的。
  3. 然而`adjust()`则是从新重置io_buf,将所有数据都重新变成未处理状态。
  4. `clear()`则是将lengthhead0,这里没有提供`delete`真是删除物理内存的方法,因为这里的buffer设计是不需要清理的,接下来是用一个`buf_pool`来管理全部未被使用的`io_buf`集合。而且`buf_pool`的管理的内存是程序开始预开辟的,不会做清理工作.

3.2 buf_pool 内存池

  1. 接下来我们看看内存池的设计.

lars_reactor/include/buf_pool.h

  1. #pragma once
  2. #include <ext/hash_map>
  3. #include "io_buf.h"
  4. typedef __gnu_cxx::hash_map<int, io_buf*> pool_t;
  5. enum MEM_CAP {
  6. m4K = 4096,
  7. m16K = 16384,
  8. m64K = 65536,
  9. m256K = 262144,
  10. m1M = 1048576,
  11. m4M = 4194304,
  12. m8M = 8388608
  13. };
  14. //总内存池最大限制 单位是Kb 所以目前限制是 5GB
  15. #define EXTRA_MEM_LIMIT (5U *1024 *1024)
  16. /*
  17. * 定义buf内存池
  18. * 设计为单例
  19. * */
  20. class buf_pool
  21. {
  22. public:
  23. //初始化单例对象
  24. static void init() {
  25. //创建单例
  26. _instance = new buf_pool();
  27. }
  28. //获取单例方法
  29. static buf_pool *instance() {
  30. //保证init方法在这个进程执行中 只被执行一次
  31. pthread_once(&_once, init);
  32. return _instance;
  33. }
  34. //开辟一个io_buf
  35. io_buf *alloc_buf(int N);
  36. io_buf *alloc_buf() { return alloc_buf(m4K); }
  37. //重置一个io_buf
  38. void revert(io_buf *buffer);
  39. private:
  40. buf_pool();
  41. //拷贝构造私有化
  42. buf_pool(const buf_pool&);
  43. const buf_pool& operator=(const buf_pool&);
  44. //所有buffer的一个map集合句柄
  45. pool_t _pool;
  46. //总buffer池的内存大小 单位为KB
  47. uint64_t _total_mem;
  48. //单例对象
  49. static buf_pool *_instance;
  50. //用于保证创建单例的init方法只执行一次的锁
  51. static pthread_once_t _once;
  52. //用户保护内存池链表修改的互斥锁
  53. static pthread_mutex_t _mutex;
  54. };
  1. 首先`buf_pool`采用单例的方式进行设计。因为系统希望仅有一个内存池管理模块。这里内存池用一个`__gnu_cxx::hash_map<int, io_buf*>`map类型进行管理,其中key是每个组内存的空间容量,参考
  1. enum MEM_CAP {
  2. m4K = 4096,
  3. m16K = 16384,
  4. m64K = 65536,
  5. m256K = 262144,
  6. m1M = 1048576,
  7. m4M = 4194304,
  8. m8M = 8388608
  9. };
  1. 其中每个key下面挂在一个`io_buf`链表。而且`buf_pool`预先会给map下的每个key的内存组开辟好一定数量的内存块。然后上层用户在使用的时候每次取出一个内存块,就会将该内存块从该内存组摘掉。当然使用完就放回来。如果不够使用会额外开辟,也有最大的内存限制,在宏`EXTRA_MEM_LIMIT`中。

具体的buf_pool实现如下:

lars_reactor/src/buf_pool.cpp

  1. #include "buf_pool.h"
  2. #include <assert.h>
  3. //单例对象
  4. buf_pool * buf_pool::_instance = NULL;
  5. //用于保证创建单例的init方法只执行一次的锁
  6. pthread_once_t buf_pool::_once = PTHREAD_ONCE_INIT;
  7. //用户保护内存池链表修改的互斥锁
  8. pthread_mutex_t buf_pool::_mutex = PTHREAD_MUTEX_INITIALIZER;
  9. //构造函数 主要是预先开辟一定量的空间
  10. //这里buf_pool是一个hash,每个key都是不同空间容量
  11. //对应的value是一个io_buf集合的链表
  12. //buf_pool --> [m4K] -- io_buf-io_buf-io_buf-io_buf...
  13. // [m16K] -- io_buf-io_buf-io_buf-io_buf...
  14. // [m64K] -- io_buf-io_buf-io_buf-io_buf...
  15. // [m256K] -- io_buf-io_buf-io_buf-io_buf...
  16. // [m1M] -- io_buf-io_buf-io_buf-io_buf...
  17. // [m4M] -- io_buf-io_buf-io_buf-io_buf...
  18. // [m8M] -- io_buf-io_buf-io_buf-io_buf...
  19. buf_pool::buf_pool():_total_mem(0)
  20. {
  21. io_buf *prev;
  22. //----> 开辟4K buf 内存池
  23. _pool[m4K] = new io_buf(m4K);
  24. if (_pool[m4K] == NULL) {
  25. fprintf(stderr, "new io_buf m4K error");
  26. exit(1);
  27. }
  28. prev = _pool[m4K];
  29. //4K的io_buf 预先开辟5000个,约20MB供开发者使用
  30. for (int i = 1; i < 5000; i ++) {
  31. prev->next = new io_buf(m4K);
  32. if (prev->next == NULL) {
  33. fprintf(stderr, "new io_buf m4K error");
  34. exit(1);
  35. }
  36. prev = prev->next;
  37. }
  38. _total_mem += 4 * 5000;
  39. //----> 开辟16K buf 内存池
  40. _pool[m16K] = new io_buf(m16K);
  41. if (_pool[m16K] == NULL) {
  42. fprintf(stderr, "new io_buf m16K error");
  43. exit(1);
  44. }
  45. prev = _pool[m16K];
  46. //16K的io_buf 预先开辟1000个,约16MB供开发者使用
  47. for (int i = 1; i < 1000; i ++) {
  48. prev->next = new io_buf(m16K);
  49. if (prev->next == NULL) {
  50. fprintf(stderr, "new io_buf m16K error");
  51. exit(1);
  52. }
  53. prev = prev->next;
  54. }
  55. _total_mem += 16 * 1000;
  56. //----> 开辟64K buf 内存池
  57. _pool[m64K] = new io_buf(m64K);
  58. if (_pool[m64K] == NULL) {
  59. fprintf(stderr, "new io_buf m64K error");
  60. exit(1);
  61. }
  62. prev = _pool[m64K];
  63. //64K的io_buf 预先开辟500个,约32MB供开发者使用
  64. for (int i = 1; i < 500; i ++) {
  65. prev->next = new io_buf(m64K);
  66. if (prev->next == NULL) {
  67. fprintf(stderr, "new io_buf m64K error");
  68. exit(1);
  69. }
  70. prev = prev->next;
  71. }
  72. _total_mem += 64 * 500;
  73. //----> 开辟256K buf 内存池
  74. _pool[m256K] = new io_buf(m256K);
  75. if (_pool[m256K] == NULL) {
  76. fprintf(stderr, "new io_buf m256K error");
  77. exit(1);
  78. }
  79. prev = _pool[m256K];
  80. //256K的io_buf 预先开辟200个,约50MB供开发者使用
  81. for (int i = 1; i < 200; i ++) {
  82. prev->next = new io_buf(m256K);
  83. if (prev->next == NULL) {
  84. fprintf(stderr, "new io_buf m256K error");
  85. exit(1);
  86. }
  87. prev = prev->next;
  88. }
  89. _total_mem += 256 * 200;
  90. //----> 开辟1M buf 内存池
  91. _pool[m1M] = new io_buf(m1M);
  92. if (_pool[m1M] == NULL) {
  93. fprintf(stderr, "new io_buf m1M error");
  94. exit(1);
  95. }
  96. prev = _pool[m1M];
  97. //1M的io_buf 预先开辟50个,约50MB供开发者使用
  98. for (int i = 1; i < 50; i ++) {
  99. prev->next = new io_buf(m1M);
  100. if (prev->next == NULL) {
  101. fprintf(stderr, "new io_buf m1M error");
  102. exit(1);
  103. }
  104. prev = prev->next;
  105. }
  106. _total_mem += 1024 * 50;
  107. //----> 开辟4M buf 内存池
  108. _pool[m4M] = new io_buf(m4M);
  109. if (_pool[m4M] == NULL) {
  110. fprintf(stderr, "new io_buf m4M error");
  111. exit(1);
  112. }
  113. prev = _pool[m4M];
  114. //4M的io_buf 预先开辟20个,约80MB供开发者使用
  115. for (int i = 1; i < 20; i ++) {
  116. prev->next = new io_buf(m4M);
  117. if (prev->next == NULL) {
  118. fprintf(stderr, "new io_buf m4M error");
  119. exit(1);
  120. }
  121. prev = prev->next;
  122. }
  123. _total_mem += 4096 * 20;
  124. //----> 开辟8M buf 内存池
  125. _pool[m8M] = new io_buf(m8M);
  126. if (_pool[m8M] == NULL) {
  127. fprintf(stderr, "new io_buf m8M error");
  128. exit(1);
  129. }
  130. prev = _pool[m8M];
  131. //8M的io_buf 预先开辟10个,约80MB供开发者使用
  132. for (int i = 1; i < 10; i ++) {
  133. prev->next = new io_buf(m8M);
  134. if (prev->next == NULL) {
  135. fprintf(stderr, "new io_buf m8M error");
  136. exit(1);
  137. }
  138. prev = prev->next;
  139. }
  140. _total_mem += 8192 * 10;
  141. }
  142. //开辟一个io_buf
  143. //1 如果上层需要N个字节的大小的空间,找到与N最接近的buf hash组,取出,
  144. //2 如果该组已经没有节点使用,可以额外申请
  145. //3 总申请长度不能够超过最大的限制大小 EXTRA_MEM_LIMIT
  146. //4 如果有该节点需要的内存块,直接取出,并且将该内存块从pool摘除
  147. io_buf *buf_pool::alloc_buf(int N)
  148. {
  149. //1 找到N最接近哪hash 组
  150. int index;
  151. if (N <= m4K) {
  152. index = m4K;
  153. }
  154. else if (N <= m16K) {
  155. index = m16K;
  156. }
  157. else if (N <= m64K) {
  158. index = m64K;
  159. }
  160. else if (N <= m256K) {
  161. index = m256K;
  162. }
  163. else if (N <= m1M) {
  164. index = m1M;
  165. }
  166. else if (N <= m4M) {
  167. index = m4M;
  168. }
  169. else if (N <= m8M) {
  170. index = m8M;
  171. }
  172. else {
  173. return NULL;
  174. }
  175. //2 如果该组已经没有,需要额外申请,那么需要加锁保护
  176. pthread_mutex_lock(&_mutex);
  177. if (_pool[index] == NULL) {
  178. if (_total_mem + index/1024 >= EXTRA_MEM_LIMIT) {
  179. //当前的开辟的空间已经超过最大限制
  180. fprintf(stderr, "already use too many memory!\n");
  181. exit(1);
  182. }
  183. io_buf *new_buf = new io_buf(index);
  184. if (new_buf == NULL) {
  185. fprintf(stderr, "new io_buf error\n");
  186. exit(1);
  187. }
  188. _total_mem += index/1024;
  189. pthread_mutex_unlock(&_mutex);
  190. return new_buf;
  191. }
  192. //3 从pool中摘除该内存块
  193. io_buf *target = _pool[index];
  194. _pool[index] = target->next;
  195. pthread_mutex_unlock(&_mutex);
  196. target->next = NULL;
  197. return target;
  198. }
  199. //重置一个io_buf,将一个buf 上层不再使用,或者使用完成之后,需要将该buf放回pool中
  200. void buf_pool::revert(io_buf *buffer)
  201. {
  202. //每个buf的容量都是固定的 在hash的key中取值
  203. int index = buffer->capacity;
  204. //重置io_buf中的内置位置指针
  205. buffer->length = 0;
  206. buffer->head = 0;
  207. pthread_mutex_lock(&_mutex);
  208. //找到对应的hash组 buf首届点地址
  209. assert(_pool.find(index) != _pool.end());
  210. //将buffer插回链表头部
  211. buffer->next = _pool[index];
  212. _pool[index] = buffer;
  213. pthread_mutex_unlock(&_mutex);
  214. }
  1. 其中,`buf_pool`构造函数中实现了内存池的hash预开辟内存工作,具体的数据结构如下
  1. //buf_pool --> [m4K] --> io_buf-io_buf-io_buf-io_buf...
  2. // [m16K] --> io_buf-io_buf-io_buf-io_buf...
  3. // [m64K] --> io_buf-io_buf-io_buf-io_buf...
  4. // [m256K] --> io_buf-io_buf-io_buf-io_buf...
  5. // [m1M] --> io_buf-io_buf-io_buf-io_buf...
  6. // [m4M] --> io_buf-io_buf-io_buf-io_buf...
  7. // [m8M] --> io_buf-io_buf-io_buf-io_buf...
  1. `alloc_buf()`方法,是调用者从内存池中取出一块内存,如果最匹配的内存块存在,则返回,并将该块内存从buf_pool中摘除掉,如果没有则开辟一个内存出来。 `revert()`方法则是将已经使用完的`io_buf`重新放回`buf_pool`中。

3.3 读写buffer机制

  1. 那么接下来我们就需要实现一个专门用来读(输入)数据的`input_buf`和专门用来写(输出)数据的`output_buf`类了。由于这两个人都应该拥有一些`io_buf`的特性,所以我们先定义一个基础的父类`reactor_buf`

A. reactor_buf类

lars_reactor/include/reactor_buf.h

  1. #pragma once
  2. #include "io_buf.h"
  3. #include "buf_pool.h"
  4. #include <assert.h>
  5. #include <unistd.h>
  6. /*
  7. * 给业务层提供的最后tcp_buffer结构
  8. * */
  9. class reactor_buf {
  10. public:
  11. reactor_buf();
  12. ~reactor_buf();
  13. const int length() const;
  14. void pop(int len);
  15. void clear();
  16. protected:
  17. io_buf *_buf;
  18. };
  1. 这个的作用就是将io_buf作为自己的一个成员,然后做了一些包装。具体方法实现如下。

lars_reactor/src/reactor.cpp

  1. #include "reactor_buf.h"
  2. #include <sys/ioctl.h>
  3. #include <string.h>
  4. reactor_buf::reactor_buf()
  5. {
  6. _buf = NULL;
  7. }
  8. reactor_buf::~reactor_buf()
  9. {
  10. clear();
  11. }
  12. const int reactor_buf::length() const
  13. {
  14. return _buf != NULL? _buf->length : 0;
  15. }
  16. void reactor_buf::pop(int len)
  17. {
  18. assert(_buf != NULL && len <= _buf->length);
  19. _buf->pop(len);
  20. //当此时_buf的可用长度已经为0
  21. if(_buf->length == 0) {
  22. //将_buf重新放回buf_pool中
  23. buf_pool::instance()->revert(_buf);
  24. _buf = NULL;
  25. }
  26. }
  27. void reactor_buf::clear()
  28. {
  29. if (_buf != NULL) {
  30. //将_buf重新放回buf_pool中
  31. buf_pool::instance()->revert(_buf);
  32. _buf = NULL;
  33. }
  34. }

B. input_buf类

  1. 接下来就可以集成`reactor_buf`类实现`input_buf`类的设计了。

lars_reactor/include/reactor_buf.h

  1. //读(输入) 缓存buffer
  2. class input_buf : public reactor_buf
  3. {
  4. public:
  5. //从一个fd中读取数据到reactor_buf中
  6. int read_data(int fd);
  7. //取出读到的数据
  8. const char *data() const;
  9. //重置缓冲区
  10. void adjust();
  11. };
  1. 其中data()方法即取出已经读取的数据,adjust()含义和`io_buf`含义一致。主要是`read_data()`方法。具体实现如下。

lars_reactor/src/reactor.cpp

  1. //从一个fd中读取数据到reactor_buf中
  2. int input_buf::read_data(int fd)
  3. {
  4. int need_read;//硬件有多少数据可以读
  5. //一次性读出所有的数据
  6. //需要给fd设置FIONREAD,
  7. //得到read缓冲中有多少数据是可以读取的
  8. if (ioctl(fd, FIONREAD, &need_read) == -1) {
  9. fprintf(stderr, "ioctl FIONREAD\n");
  10. return -1;
  11. }
  12. if (_buf == NULL) {
  13. //如果io_buf为空,从内存池申请
  14. _buf = buf_pool::instance()->alloc_buf(need_read);
  15. if (_buf == NULL) {
  16. fprintf(stderr, "no idle buf for alloc\n");
  17. return -1;
  18. }
  19. }
  20. else {
  21. //如果io_buf可用,判断是否够存
  22. assert(_buf->head == 0);
  23. if (_buf->capacity - _buf->length < (int)need_read) {
  24. //不够存,冲内存池申请
  25. io_buf *new_buf = buf_pool::instance()->alloc_buf(need_read+_buf->length);
  26. if (new_buf == NULL) {
  27. fprintf(stderr, "no ilde buf for alloc\n");
  28. return -1;
  29. }
  30. //将之前的_buf的数据考到新申请的buf中
  31. new_buf->copy(_buf);
  32. //将之前的_buf放回内存池中
  33. buf_pool::instance()->revert(_buf);
  34. //新申请的buf成为当前io_buf
  35. _buf = new_buf;
  36. }
  37. }
  38. //读取数据
  39. int already_read = 0;
  40. do {
  41. //读取的数据拼接到之前的数据之后
  42. if(need_read == 0) {
  43. //可能是read阻塞读数据的模式,对方未写数据
  44. already_read = read(fd, _buf->data + _buf->length, m4K);
  45. } else {
  46. already_read = read(fd, _buf->data + _buf->length, need_read);
  47. }
  48. } while (already_read == -1 && errno == EINTR); //systemCall引起的中断 继续读取
  49. if (already_read > 0) {
  50. if (need_read != 0) {
  51. assert(already_read == need_read);
  52. }
  53. _buf->length += already_read;
  54. }
  55. return already_read;
  56. }
  57. //取出读到的数据
  58. const char *input_buf::data() const
  59. {
  60. return _buf != NULL ? _buf->data + _buf->head : NULL;
  61. }
  62. //重置缓冲区
  63. void input_buf::adjust()
  64. {
  65. if (_buf != NULL) {
  66. _buf->adjust();
  67. }
  68. }

C. output_buf类

  1. 接下来就可以集成`reactor_buf`类实现`output_buf`类的设计了。

lars_reactor/include/reactor_buf.h

  1. //写(输出) 缓存buffer
  2. class output_buf : public reactor_buf
  3. {
  4. public:
  5. //将一段数据 写到一个reactor_buf中
  6. int send_data(const char *data, int datalen);
  7. //将reactor_buf中的数据写到一个fd中
  8. int write2fd(int fd);
  9. };
  1. `send_data()`方法主要是将数据写到`io_buf`中,实际上并没有做真正的写操作。而是当调用`write2fd`方法时,才会将`io_buf`的数据写到对应的fd中。send_data是做一些buf内存块的申请等工作。具体实现如下

lars_reactor/src/reactor.cpp

  1. //将一段数据 写到一个reactor_buf中
  2. int output_buf::send_data(const char *data, int datalen)
  3. {
  4. if (_buf == NULL) {
  5. //如果io_buf为空,从内存池申请
  6. _buf = buf_pool::instance()->alloc_buf(datalen);
  7. if (_buf == NULL) {
  8. fprintf(stderr, "no idle buf for alloc\n");
  9. return -1;
  10. }
  11. }
  12. else {
  13. //如果io_buf可用,判断是否够存
  14. assert(_buf->head == 0);
  15. if (_buf->capacity - _buf->length < datalen) {
  16. //不够存,冲内存池申请
  17. io_buf *new_buf = buf_pool::instance()->alloc_buf(datalen+_buf->length);
  18. if (new_buf == NULL) {
  19. fprintf(stderr, "no ilde buf for alloc\n");
  20. return -1;
  21. }
  22. //将之前的_buf的数据考到新申请的buf中
  23. new_buf->copy(_buf);
  24. //将之前的_buf放回内存池中
  25. buf_pool::instance()->revert(_buf);
  26. //新申请的buf成为当前io_buf
  27. _buf = new_buf;
  28. }
  29. }
  30. //将data数据拷贝到io_buf中,拼接到后面
  31. memcpy(_buf->data + _buf->length, data, datalen);
  32. _buf->length += datalen;
  33. return 0;
  34. }
  35. //将reactor_buf中的数据写到一个fd中
  36. int output_buf::write2fd(int fd)
  37. {
  38. assert(_buf != NULL && _buf->head == 0);
  39. int already_write = 0;
  40. do {
  41. already_write = write(fd, _buf->data, _buf->length);
  42. } while (already_write == -1 && errno == EINTR); //systemCall引起的中断,继续写
  43. if (already_write > 0) {
  44. //已经处理的数据清空
  45. _buf->pop(already_write);
  46. //未处理数据前置,覆盖老数据
  47. _buf->adjust();
  48. }
  49. //如果fd非阻塞,可能会得到EAGAIN错误
  50. if (already_write == -1 && errno == EAGAIN) {
  51. already_write = 0;//不是错误,仅仅返回0,表示目前是不可以继续写的
  52. }
  53. return already_write;
  54. }
  1. 现在我们已经完成了内存管理及读写buf机制的实现,接下来就要简单的测试一下,用我们之前的V0.1版本的reactor server来测试。

3.4 完成Lars Reactor V0.2开发

A. 修改tcp_server

  1. 主要修正do_accept()方法,加上reactor_buf机制.

lars_reactor/src/tcp_server.cpp

  1. #include <stdio.h>
  2. #include <stdlib.h>
  3. #include <string.h>
  4. #include <strings.h>
  5. #include <unistd.h>
  6. #include <signal.h>
  7. #include <sys/types.h> /* See NOTES */
  8. #include <sys/socket.h>
  9. #include <arpa/inet.h>
  10. #include <errno.h>
  11. #include "tcp_server.h"
  12. #include "reactor_buf.h"
  13. //server的构造函数
  14. tcp_server::tcp_server(const char *ip, uint16_t port)
  15. {
  16. //...
  17. }
  18. //开始提供创建链接服务
  19. void tcp_server::do_accept()
  20. {
  21. int connfd;
  22. while(true) {
  23. //accept与客户端创建链接
  24. printf("begin accept\n");
  25. connfd = accept(_sockfd, (struct sockaddr*)&_connaddr, &_addrlen);
  26. if (connfd == -1) {
  27. if (errno == EINTR) {
  28. fprintf(stderr, "accept errno=EINTR\n");
  29. continue;
  30. }
  31. else if (errno == EMFILE) {
  32. //建立链接过多,资源不够
  33. fprintf(stderr, "accept errno=EMFILE\n");
  34. }
  35. else if (errno == EAGAIN) {
  36. fprintf(stderr, "accept errno=EAGAIN\n");
  37. break;
  38. }
  39. else {
  40. fprintf(stderr, "accept error");
  41. exit(1);
  42. }
  43. }
  44. else {
  45. //accept succ!
  46. int ret = 0;
  47. input_buf ibuf;
  48. output_buf obuf;
  49. char *msg = NULL;
  50. int msg_len = 0;
  51. do {
  52. ret = ibuf.read_data(connfd);
  53. if (ret == -1) {
  54. fprintf(stderr, "ibuf read_data error\n");
  55. break;
  56. }
  57. printf("ibuf.length() = %d\n", ibuf.length());
  58. //将读到的数据放在msg中
  59. msg_len = ibuf.length();
  60. msg = (char*)malloc(msg_len);
  61. bzero(msg, msg_len);
  62. memcpy(msg, ibuf.data(), msg_len);
  63. ibuf.pop(msg_len);
  64. ibuf.adjust();
  65. printf("recv data = %s\n", msg);
  66. //回显数据
  67. obuf.send_data(msg, msg_len);
  68. while(obuf.length()) {
  69. int write_ret = obuf.write2fd(connfd);
  70. if (write_ret == -1) {
  71. fprintf(stderr, "write connfd error\n");
  72. return;
  73. }
  74. else if(write_ret == 0) {
  75. //不是错误,表示此时不可写
  76. break;
  77. }
  78. }
  79. free(msg);
  80. } while (ret != 0);
  81. //Peer is closed
  82. close(connfd);
  83. }
  84. }
  85. }

编译生成新的liblreactor.a

  1. $cd lars_reactor/
  2. $make
  3. g++ -g -O2 -Wall -fPIC -Wno-deprecated -c -o src/tcp_server.o src/tcp_server.cpp -I./include
  4. g++ -g -O2 -Wall -fPIC -Wno-deprecated -c -o src/io_buf.o src/io_buf.cpp -I./include
  5. g++ -g -O2 -Wall -fPIC -Wno-deprecated -c -o src/reactor_buf.o src/reactor_buf.cpp -I./include
  6. g++ -g -O2 -Wall -fPIC -Wno-deprecated -c -o src/buf_pool.o src/buf_pool.cpp -I./include
  7. mkdir -p lib
  8. ar cqs lib/liblreactor.a src/tcp_server.o src/io_buf.o src/reactor_buf.o src/buf_pool.o

B. 编译V0.2 server APP

  1. 我们将lars_reactor/example/lars_reactor_0.1 的代码复制一份到 lars_reactor/example/lars_reactor_0.2中。

由于我们这里使用了pthread库,所以在lars_reactor_0.2的Makefile文件要加上pthread库的关联

lars_reactor/example/lars_reactor_0.2/Makefile

  1. CXX=g++
  2. CFLAGS=-g -O2 -Wall -fPIC -Wno-deprecated
  3. INC=-I../../include
  4. LIB=-L../../lib -llreactor -lpthread
  5. OBJS = $(addsuffix .o, $(basename $(wildcard *.cc)))
  6. all:
  7. $(CXX) -o lars_reactor $(CFLAGS) lars_reactor.cpp $(INC) $(LIB)
  8. clean:
  9. -rm -f *.o lars_reactor

编译在lars_reactor/example/lars_reactor_0.2/

  1. $ cd lars_reactor/example/lars_reactor_0.2/
  2. $ make
  3. g++ -o lars_reactor -g -O2 -Wall -fPIC -Wno-deprecated lars_reactor.cpp -I../../include -L../../lib -llreactor -lpthread

C. 测试

启动server

  1. $ ./lars_reactor
  2. begin accept

启动client

  1. $ nc 127.0.0.1 7777

客户端输入 文字,效果如下:

服务端:

  1. ibuf.length() = 21
  2. recv data = hello lars, By Aceld

客户端:

  1. $ nc 127.0.0.1 7777
  2. hello lars, By Aceld
  3. hello lars, By Aceld
  1. ok!现在我们的读写buffer机制已经成功的集成到我们的lars网络框架中了。