1. 接下来为了让Reactor框架功能更加丰富,结合之前的功能,再加上udpserver的服务接口。udp我们暂时不考虑加线程池实现,只是单线程的处理方式。

12.1 udp_server服务端功能实现

lars_reactor/include/udp_server.h

  1. #pragma once
  2. #include <netinet/in.h>
  3. #include "event_loop.h"
  4. #include "net_connection.h"
  5. #include "message.h"
  6. class udp_server :public net_connection
  7. {
  8. public:
  9. udp_server(event_loop *loop, const char *ip, uint16_t port);
  10. virtual int send_message(const char *data, int msglen, int msgid);
  11. //注册消息路由回调函数
  12. void add_msg_router(int msgid, msg_callback* cb, void *user_data = NULL);
  13. ~udp_server();
  14. //处理消息业务
  15. void do_read();
  16. private:
  17. int _sockfd;
  18. char _read_buf[MESSAGE_LENGTH_LIMIT];
  19. char _write_buf[MESSAGE_LENGTH_LIMIT];
  20. //事件触发
  21. event_loop* _loop;
  22. //服务端ip
  23. struct sockaddr_in _client_addr;
  24. socklen_t _client_addrlen;
  25. //消息路由分发
  26. msg_router _router;
  27. };
  1. 对应的方法实现方式如下:

lars_reactor/src/udp_server.cpp

  1. #include <signal.h>
  2. #include <unistd.h>
  3. #include <strings.h>
  4. #include <sys/socket.h>
  5. #include <sys/types.h>
  6. #include <netinet/in.h>
  7. #include <arpa/inet.h>
  8. #include <stdio.h>
  9. #include <string.h>
  10. #include "udp_server.h"
  11. void read_callback(event_loop *loop, int fd, void *args)
  12. {
  13. udp_server *server = (udp_server*)args;
  14. //处理业务函数
  15. server->do_read();
  16. }
  17. void udp_server::do_read()
  18. {
  19. while (true) {
  20. int pkg_len = recvfrom(_sockfd, _read_buf, sizeof(_read_buf), 0, (struct sockaddr *)&_client_addr, &_client_addrlen);
  21. if (pkg_len == -1) {
  22. if (errno == EINTR) {
  23. continue;
  24. }
  25. else if (errno == EAGAIN) {
  26. break;
  27. }
  28. else {
  29. perror("recvfrom\n");
  30. break;
  31. }
  32. }
  33. //处理数据
  34. msg_head head;
  35. memcpy(&head, _read_buf, MESSAGE_HEAD_LEN);
  36. if (head.msglen > MESSAGE_LENGTH_LIMIT || head.msglen < 0 || head.msglen + MESSAGE_HEAD_LEN != pkg_len) {
  37. //报文格式有问题
  38. fprintf(stderr, "do_read, data error, msgid = %d, msglen = %d, pkg_len = %d\n", head.msgid, head.msglen, pkg_len);
  39. continue;
  40. }
  41. //调用注册的路由业务
  42. _router.call(head.msgid, head.msglen, _read_buf+MESSAGE_HEAD_LEN, this);
  43. }
  44. }
  45. udp_server::udp_server(event_loop *loop, const char *ip, uint16_t port)
  46. {
  47. //1 忽略一些信号
  48. if (signal(SIGHUP, SIG_IGN) == SIG_ERR) {
  49. perror("signal ignore SIGHUB");
  50. exit(1);
  51. }
  52. if (signal(SIGPIPE, SIG_IGN) == SIG_ERR) {
  53. perror("signal ignore SIGPIPE");
  54. exit(1);
  55. }
  56. //2 创建套接字
  57. //SOCK_CLOEXEC在execl中使用该socket则关闭,在fork中使用该socket不关闭
  58. _sockfd = socket(AF_INET, SOCK_DGRAM | SOCK_NONBLOCK | SOCK_CLOEXEC, IPPROTO_UDP);
  59. if (_sockfd == -1) {
  60. perror("create udp socket");
  61. exit(1);
  62. }
  63. //3 设置服务ip+port
  64. struct sockaddr_in servaddr;
  65. bzero(&servaddr, sizeof(servaddr));
  66. servaddr.sin_family = AF_INET;
  67. inet_aton(ip, &servaddr.sin_addr);//设置ip
  68. servaddr.sin_port = htons(port);//设置端口
  69. //4 绑定
  70. bind(_sockfd, (const struct sockaddr*)&servaddr, sizeof(servaddr));
  71. //3 添加读业务事件
  72. _loop = loop;
  73. bzero(&_client_addr, sizeof(_client_addr));
  74. _client_addrlen = sizeof(_client_addr);
  75. printf("server on %s:%u is running...\n", ip, port);
  76. _loop->add_io_event(_sockfd, read_callback, EPOLLIN, this);
  77. }
  78. int udp_server::send_message(const char *data, int msglen, int msgid)
  79. {
  80. if (msglen > MESSAGE_LENGTH_LIMIT) {
  81. fprintf(stderr, "too large message to send\n");
  82. return -1;
  83. }
  84. msg_head head;
  85. head.msglen = msglen;
  86. head.msgid = msgid;
  87. memcpy(_write_buf, &head, MESSAGE_HEAD_LEN);
  88. memcpy(_write_buf + MESSAGE_HEAD_LEN, data, msglen);
  89. int ret = sendto(_sockfd, _write_buf, msglen + MESSAGE_HEAD_LEN, 0, (struct sockaddr*)&_client_addr, _client_addrlen);
  90. if (ret == -1) {
  91. perror("sendto()..");
  92. return -1;
  93. }
  94. return ret;
  95. }
  96. //注册消息路由回调函数
  97. void udp_server::add_msg_router(int msgid, msg_callback* cb, void *user_data)
  98. {
  99. _router.register_msg_router(msgid, cb, user_data);
  100. }
  101. udp_server::~udp_server()
  102. {
  103. _loop->del_io_event(_sockfd);
  104. close(_sockfd);
  105. }
  1. 这里面实现的方式和tcp_server的实现方式几乎一样,需要注意的是,udpsocket编程是不需要listen的,而且也不需要accept。所以recvfrom就能够得知每个包的对应客户端是谁,然后回执消息给对应的客户端就可以。因为没有连接,所以都是以包为单位来处理的,一个包一个包处理。可能相邻的两个包来自不同的客户端。

12.2 udp_client客户端功能实现

lars_reactor/include/udp_client.h

  1. #pragma once
  2. #include "net_connection.h"
  3. #include "message.h"
  4. #include "event_loop.h"
  5. class udp_client: public net_connection
  6. {
  7. public:
  8. udp_client(event_loop *loop, const char *ip, uint16_t port);
  9. ~udp_client();
  10. void add_msg_router(int msgid, msg_callback *cb, void *user_data = NULL);
  11. virtual int send_message(const char *data, int msglen, int msgid);
  12. //处理消息
  13. void do_read();
  14. private:
  15. int _sockfd;
  16. char _read_buf[MESSAGE_LENGTH_LIMIT];
  17. char _write_buf[MESSAGE_LENGTH_LIMIT];
  18. //事件触发
  19. event_loop *_loop;
  20. //消息路由分发
  21. msg_router _router;
  22. };

lars_reactor/src/udp_client.cpp

  1. #include "udp_client.h"
  2. #include <sys/types.h>
  3. #include <sys/socket.h>
  4. #include <arpa/inet.h>
  5. #include <unistd.h>
  6. #include <strings.h>
  7. #include <string.h>
  8. #include <stdio.h>
  9. void read_callback(event_loop *loop, int fd, void *args)
  10. {
  11. udp_client *client = (udp_client*)args;
  12. client->do_read();
  13. }
  14. udp_client::udp_client(event_loop *loop, const char *ip, uint16_t port)
  15. {
  16. //1 创建套接字
  17. _sockfd = socket(AF_INET, SOCK_DGRAM | SOCK_NONBLOCK | SOCK_CLOEXEC, IPPROTO_UDP);
  18. if (_sockfd == -1) {
  19. perror("create socket error");
  20. exit(1);
  21. }
  22. struct sockaddr_in servaddr;
  23. bzero(&servaddr, sizeof(servaddr));
  24. servaddr.sin_family = AF_INET;
  25. inet_aton(ip, &servaddr.sin_addr);
  26. servaddr.sin_port = htons(port);
  27. //2 链接
  28. int ret = connect(_sockfd, (const struct sockaddr*)&servaddr, sizeof(servaddr));
  29. if (ret == -1) {
  30. perror("connect");
  31. exit(1);
  32. }
  33. //3 添加读事件
  34. _loop = loop;
  35. _loop->add_io_event(_sockfd, read_callback, EPOLLIN, this);
  36. }
  37. udp_client::~udp_client()
  38. {
  39. _loop->del_io_event(_sockfd);
  40. close(_sockfd);
  41. }
  42. //处理消息
  43. void udp_client::do_read()
  44. {
  45. while (true) {
  46. int pkt_len = recvfrom(_sockfd, _read_buf, sizeof(_read_buf), 0, NULL, NULL);
  47. if (pkt_len == -1) {
  48. if (errno == EINTR) {
  49. continue;
  50. }
  51. else if (errno == EAGAIN) {
  52. break;
  53. }
  54. else {
  55. perror("recvfrom()");
  56. break;
  57. }
  58. }
  59. //处理客户端包
  60. msg_head head;
  61. memcpy(&head, _read_buf, MESSAGE_HEAD_LEN);
  62. if (head.msglen > MESSAGE_LENGTH_LIMIT || head.msglen < 0 || head.msglen + MESSAGE_HEAD_LEN != pkt_len) {
  63. //报文格式有问题
  64. fprintf(stderr, "do_read, data error, msgid = %d, msglen = %d, pkt_len = %d\n", head.msgid, head.msglen, pkt_len);
  65. continue;
  66. }
  67. //调用注册的路由业务
  68. _router.call(head.msgid, head.msglen, _read_buf+MESSAGE_HEAD_LEN, this);
  69. }
  70. }
  71. void udp_client::add_msg_router(int msgid, msg_callback *cb, void *user_data)
  72. {
  73. _router.register_msg_router(msgid, cb, user_data);
  74. }
  75. int udp_client::send_message(const char *data, int msglen, int msgid)
  76. {
  77. if (msglen > MESSAGE_LENGTH_LIMIT) {
  78. fprintf(stderr, "too large message to send\n");
  79. return -1;
  80. }
  81. msg_head head;
  82. head.msglen = msglen;
  83. head.msgid = msgid;
  84. memcpy(_write_buf, &head, MESSAGE_HEAD_LEN);
  85. memcpy(_write_buf + MESSAGE_HEAD_LEN, data, msglen);
  86. int ret = sendto(_sockfd, _write_buf, msglen + MESSAGE_HEAD_LEN, 0, NULL, 0);
  87. if (ret == -1) {
  88. perror("sendto()..");
  89. return -1;
  90. }
  91. return ret;
  92. }
  1. 客户端和服务端代码除了构造函数不同,其他基本差不多。接下来我们可以测试一下udp的通信功能

12.3 完成Lars Reactor V0.10开发

服务端

server.cpp

  1. #include <string>
  2. #include <string.h>
  3. #include "config_file.h"
  4. #include "udp_server.h"
  5. //回显业务的回调函数
  6. void callback_busi(const char *data, uint32_t len, int msgid, net_connection *conn, void *user_data)
  7. {
  8. printf("callback_busi ...\n");
  9. //直接回显
  10. conn->send_message(data, len, msgid);
  11. }
  12. int main()
  13. {
  14. event_loop loop;
  15. //加载配置文件
  16. config_file::setPath("./serv.conf");
  17. std::string ip = config_file::instance()->GetString("reactor", "ip", "0.0.0.0");
  18. short port = config_file::instance()->GetNumber("reactor", "port", 8888);
  19. printf("ip = %s, port = %d\n", ip.c_str(), port);
  20. udp_server server(&loop, ip.c_str(), port);
  21. //注册消息业务路由
  22. server.add_msg_router(1, callback_busi);
  23. loop.event_process();
  24. return 0;
  25. }

客户端

client.cpp

  1. #include <stdio.h>
  2. #include <string.h>
  3. #include "udp_client.h"
  4. //客户端业务
  5. void busi(const char *data, uint32_t len, int msgid, net_connection *conn, void *user_data)
  6. {
  7. //得到服务端回执的数据
  8. char *str = NULL;
  9. str = (char*)malloc(len+1);
  10. memset(str, 0, len+1);
  11. memcpy(str, data, len);
  12. printf("recv server: [%s]\n", str);
  13. printf("msgid: [%d]\n", msgid);
  14. printf("len: [%d]\n", len);
  15. }
  16. int main()
  17. {
  18. event_loop loop;
  19. //创建udp客户端
  20. udp_client client(&loop, "127.0.0.1", 7777);
  21. //注册消息路由业务
  22. client.add_msg_router(1, busi);
  23. //发消息
  24. int msgid = 1;
  25. const char *msg = "Hello Lars!";
  26. client.send_message(msg, strlen(msg), msgid);
  27. //开启事件监听
  28. loop.event_process();
  29. return 0;
  30. }

启动服务端和客户端并允许,结果如下:

server

  1. $ ./server
  2. ip = 127.0.0.1, port = 7777
  3. msg_router init...
  4. server on 127.0.0.1:7777 is running...
  5. add msg cb msgid = 1
  6. call msgid = 1
  7. call data = Hello Lars!
  8. call msglen = 11
  9. callback_busi ...
  10. =======

client

  1. $ ./client
  2. msg_router init...
  3. add msg cb msgid = 1
  4. call msgid = 1
  5. call data = Hello Lars!
  6. call msglen = 11
  7. recv server: [Hello Lars!]
  8. msgid: [1]
  9. len: [11]
  10. =======