接下来为了让Reactor框架功能更加丰富,结合之前的功能,再加上udpserver的服务接口。udp我们暂时不考虑加线程池实现,只是单线程的处理方式。
12.1 udp_server服务端功能实现
lars_reactor/include/udp_server.h
#pragma once#include <netinet/in.h>#include "event_loop.h"#include "net_connection.h"#include "message.h"class udp_server :public net_connection{public:udp_server(event_loop *loop, const char *ip, uint16_t port);virtual int send_message(const char *data, int msglen, int msgid);//注册消息路由回调函数void add_msg_router(int msgid, msg_callback* cb, void *user_data = NULL);~udp_server();//处理消息业务void do_read();private:int _sockfd;char _read_buf[MESSAGE_LENGTH_LIMIT];char _write_buf[MESSAGE_LENGTH_LIMIT];//事件触发event_loop* _loop;//服务端ipstruct sockaddr_in _client_addr;socklen_t _client_addrlen;//消息路由分发msg_router _router;};
对应的方法实现方式如下:
lars_reactor/src/udp_server.cpp
#include <signal.h>#include <unistd.h>#include <strings.h>#include <sys/socket.h>#include <sys/types.h>#include <netinet/in.h>#include <arpa/inet.h>#include <stdio.h>#include <string.h>#include "udp_server.h"void read_callback(event_loop *loop, int fd, void *args){udp_server *server = (udp_server*)args;//处理业务函数server->do_read();}void udp_server::do_read(){while (true) {int pkg_len = recvfrom(_sockfd, _read_buf, sizeof(_read_buf), 0, (struct sockaddr *)&_client_addr, &_client_addrlen);if (pkg_len == -1) {if (errno == EINTR) {continue;}else if (errno == EAGAIN) {break;}else {perror("recvfrom\n");break;}}//处理数据msg_head head;memcpy(&head, _read_buf, MESSAGE_HEAD_LEN);if (head.msglen > MESSAGE_LENGTH_LIMIT || head.msglen < 0 || head.msglen + MESSAGE_HEAD_LEN != pkg_len) {//报文格式有问题fprintf(stderr, "do_read, data error, msgid = %d, msglen = %d, pkg_len = %d\n", head.msgid, head.msglen, pkg_len);continue;}//调用注册的路由业务_router.call(head.msgid, head.msglen, _read_buf+MESSAGE_HEAD_LEN, this);}}udp_server::udp_server(event_loop *loop, const char *ip, uint16_t port){//1 忽略一些信号if (signal(SIGHUP, SIG_IGN) == SIG_ERR) {perror("signal ignore SIGHUB");exit(1);}if (signal(SIGPIPE, SIG_IGN) == SIG_ERR) {perror("signal ignore SIGPIPE");exit(1);}//2 创建套接字//SOCK_CLOEXEC在execl中使用该socket则关闭,在fork中使用该socket不关闭_sockfd = socket(AF_INET, SOCK_DGRAM | SOCK_NONBLOCK | SOCK_CLOEXEC, IPPROTO_UDP);if (_sockfd == -1) {perror("create udp socket");exit(1);}//3 设置服务ip+portstruct sockaddr_in servaddr;bzero(&servaddr, sizeof(servaddr));servaddr.sin_family = AF_INET;inet_aton(ip, &servaddr.sin_addr);//设置ipservaddr.sin_port = htons(port);//设置端口//4 绑定bind(_sockfd, (const struct sockaddr*)&servaddr, sizeof(servaddr));//3 添加读业务事件_loop = loop;bzero(&_client_addr, sizeof(_client_addr));_client_addrlen = sizeof(_client_addr);printf("server on %s:%u is running...\n", ip, port);_loop->add_io_event(_sockfd, read_callback, EPOLLIN, this);}int udp_server::send_message(const char *data, int msglen, int msgid){if (msglen > MESSAGE_LENGTH_LIMIT) {fprintf(stderr, "too large message to send\n");return -1;}msg_head head;head.msglen = msglen;head.msgid = msgid;memcpy(_write_buf, &head, MESSAGE_HEAD_LEN);memcpy(_write_buf + MESSAGE_HEAD_LEN, data, msglen);int ret = sendto(_sockfd, _write_buf, msglen + MESSAGE_HEAD_LEN, 0, (struct sockaddr*)&_client_addr, _client_addrlen);if (ret == -1) {perror("sendto()..");return -1;}return ret;}//注册消息路由回调函数void udp_server::add_msg_router(int msgid, msg_callback* cb, void *user_data){_router.register_msg_router(msgid, cb, user_data);}udp_server::~udp_server(){_loop->del_io_event(_sockfd);close(_sockfd);}
这里面实现的方式和tcp_server的实现方式几乎一样,需要注意的是,udp的socket编程是不需要listen的,而且也不需要accept。所以recvfrom就能够得知每个包的对应客户端是谁,然后回执消息给对应的客户端就可以。因为没有连接,所以都是以包为单位来处理的,一个包一个包处理。可能相邻的两个包来自不同的客户端。
12.2 udp_client客户端功能实现
lars_reactor/include/udp_client.h
#pragma once#include "net_connection.h"#include "message.h"#include "event_loop.h"class udp_client: public net_connection{public:udp_client(event_loop *loop, const char *ip, uint16_t port);~udp_client();void add_msg_router(int msgid, msg_callback *cb, void *user_data = NULL);virtual int send_message(const char *data, int msglen, int msgid);//处理消息void do_read();private:int _sockfd;char _read_buf[MESSAGE_LENGTH_LIMIT];char _write_buf[MESSAGE_LENGTH_LIMIT];//事件触发event_loop *_loop;//消息路由分发msg_router _router;};
lars_reactor/src/udp_client.cpp
#include "udp_client.h"#include <sys/types.h>#include <sys/socket.h>#include <arpa/inet.h>#include <unistd.h>#include <strings.h>#include <string.h>#include <stdio.h>void read_callback(event_loop *loop, int fd, void *args){udp_client *client = (udp_client*)args;client->do_read();}udp_client::udp_client(event_loop *loop, const char *ip, uint16_t port){//1 创建套接字_sockfd = socket(AF_INET, SOCK_DGRAM | SOCK_NONBLOCK | SOCK_CLOEXEC, IPPROTO_UDP);if (_sockfd == -1) {perror("create socket error");exit(1);}struct sockaddr_in servaddr;bzero(&servaddr, sizeof(servaddr));servaddr.sin_family = AF_INET;inet_aton(ip, &servaddr.sin_addr);servaddr.sin_port = htons(port);//2 链接int ret = connect(_sockfd, (const struct sockaddr*)&servaddr, sizeof(servaddr));if (ret == -1) {perror("connect");exit(1);}//3 添加读事件_loop = loop;_loop->add_io_event(_sockfd, read_callback, EPOLLIN, this);}udp_client::~udp_client(){_loop->del_io_event(_sockfd);close(_sockfd);}//处理消息void udp_client::do_read(){while (true) {int pkt_len = recvfrom(_sockfd, _read_buf, sizeof(_read_buf), 0, NULL, NULL);if (pkt_len == -1) {if (errno == EINTR) {continue;}else if (errno == EAGAIN) {break;}else {perror("recvfrom()");break;}}//处理客户端包msg_head head;memcpy(&head, _read_buf, MESSAGE_HEAD_LEN);if (head.msglen > MESSAGE_LENGTH_LIMIT || head.msglen < 0 || head.msglen + MESSAGE_HEAD_LEN != pkt_len) {//报文格式有问题fprintf(stderr, "do_read, data error, msgid = %d, msglen = %d, pkt_len = %d\n", head.msgid, head.msglen, pkt_len);continue;}//调用注册的路由业务_router.call(head.msgid, head.msglen, _read_buf+MESSAGE_HEAD_LEN, this);}}void udp_client::add_msg_router(int msgid, msg_callback *cb, void *user_data){_router.register_msg_router(msgid, cb, user_data);}int udp_client::send_message(const char *data, int msglen, int msgid){if (msglen > MESSAGE_LENGTH_LIMIT) {fprintf(stderr, "too large message to send\n");return -1;}msg_head head;head.msglen = msglen;head.msgid = msgid;memcpy(_write_buf, &head, MESSAGE_HEAD_LEN);memcpy(_write_buf + MESSAGE_HEAD_LEN, data, msglen);int ret = sendto(_sockfd, _write_buf, msglen + MESSAGE_HEAD_LEN, 0, NULL, 0);if (ret == -1) {perror("sendto()..");return -1;}return ret;}
客户端和服务端代码除了构造函数不同,其他基本差不多。接下来我们可以测试一下udp的通信功能
12.3 完成Lars Reactor V0.10开发
服务端
server.cpp
#include <string>#include <string.h>#include "config_file.h"#include "udp_server.h"//回显业务的回调函数void callback_busi(const char *data, uint32_t len, int msgid, net_connection *conn, void *user_data){printf("callback_busi ...\n");//直接回显conn->send_message(data, len, msgid);}int main(){event_loop loop;//加载配置文件config_file::setPath("./serv.conf");std::string ip = config_file::instance()->GetString("reactor", "ip", "0.0.0.0");short port = config_file::instance()->GetNumber("reactor", "port", 8888);printf("ip = %s, port = %d\n", ip.c_str(), port);udp_server server(&loop, ip.c_str(), port);//注册消息业务路由server.add_msg_router(1, callback_busi);loop.event_process();return 0;}
客户端
client.cpp
#include <stdio.h>#include <string.h>#include "udp_client.h"//客户端业务void busi(const char *data, uint32_t len, int msgid, net_connection *conn, void *user_data){//得到服务端回执的数据char *str = NULL;str = (char*)malloc(len+1);memset(str, 0, len+1);memcpy(str, data, len);printf("recv server: [%s]\n", str);printf("msgid: [%d]\n", msgid);printf("len: [%d]\n", len);}int main(){event_loop loop;//创建udp客户端udp_client client(&loop, "127.0.0.1", 7777);//注册消息路由业务client.add_msg_router(1, busi);//发消息int msgid = 1;const char *msg = "Hello Lars!";client.send_message(msg, strlen(msg), msgid);//开启事件监听loop.event_process();return 0;}
启动服务端和客户端并允许,结果如下:
server
$ ./serverip = 127.0.0.1, port = 7777msg_router init...server on 127.0.0.1:7777 is running...add msg cb msgid = 1call msgid = 1call data = Hello Lars!call msglen = 11callback_busi ...=======
client
$ ./clientmsg_router init...add msg cb msgid = 1call msgid = 1call data = Hello Lars!call msglen = 11recv server: [Hello Lars!]msgid: [1]len: [11]=======
