从一个例子开始

保持对连接有效性的检测,是我们在实战中必须要注意的一个点。
**

TCP Keep-Alive 选项

定义一个时间段,在这个时间段内,如果没有任何连接相关的活动,TCP 保活机制会开始作用,每隔一个时间间隔,发送一个探测报文,该探测报文包含的数据非常少,如果连续几个探测报文都没有得到响应,则认为当前的 TCP 连接已经死亡,系统内核将错误信息通知给上层应用程序。

在 Linux 系统中,这些变量分别对应 sysctl 变量:

  • net.ipv4.tcp_keepalive_time
  • net.ipv4.tcp_keepalive_intvl
  • net.ipv4.tcp_keepalve_probes
  • 默认设置是 7200 秒(2 小时)、75 秒和 9 次探测。

TCP 保活机制默认是关闭的,当我们选择打开时,可以分别在连接的两个方向上开启,也可以单独在一个方向上开启。

  • 如果开启服务器端到客户端的检测,就可以在客户端非正常断连的情况下清除在服务器端保留的“脏数据”;
  • 而开启客户端到服务器端的检测,就可以在服务器无响应的情况下,重新发起连接。

应用层探活

有两个比较关键的点:

  • 第一个是需要使用定时器,这可以通过使用 I/O 复用自身的机制来实现;
  • 第二个是需要设计一个 PING-PONG 的协议。

消息格式设计

  1. typedef struct {
  2. u_int32_t type;
  3. char data[1024];
  4. } messageObject;
  5. #define MSG_PING 1
  6. #define MSG_PONG 2
  7. #define MSG_TYPE1 11
  8. #define MSG_TYPE2 21

客户端程序设计

  1. #include "lib/common.h"
  2. #include "message_objecte.h"
  3. #define MAXLINE 4096
  4. #define KEEP_ALIVE_TIME 10
  5. #define KEEP_ALIVE_INTERVAL 3
  6. #define KEEP_ALIVE_PROBETIMES 3
  7. int main(int argc, char **argv) {
  8. if (argc != 2) {
  9. error(1, 0, "usage: tcpclient <IPaddress>");
  10. }
  11. int socket_fd;
  12. socket_fd = socket(AF_INET, SOCK_STREAM, 0);
  13. struct sockaddr_in server_addr;
  14. bzero(&server_addr, sizeof(server_addr));
  15. server_addr.sin_family = AF_INET;
  16. server_addr.sin_port = htons(SERV_PORT);
  17. inet_pton(AF_INET, argv[1], &server_addr.sin_addr);
  18. socklen_t server_len = sizeof(server_addr);
  19. int connect_rt = connect(socket_fd, (struct sockaddr *) &server_addr, server_len);
  20. if (connect_rt < 0) {
  21. error(1, errno, "connect failed ");
  22. }
  23. char recv_line[MAXLINE + 1];
  24. int n;
  25. fd_set readmask;
  26. fd_set allreads;
  27. struct timeval tv;
  28. int heartbeats = 0;
  29. tv.tv_sec = KEEP_ALIVE_TIME;
  30. tv.tv_usec = 0;
  31. messageObject messageObject;
  32. FD_ZERO(&allreads);
  33. FD_SET(socket_fd, &allreads);
  34. for (;;) {
  35. readmask = allreads;
  36. int rc = select(socket_fd + 1, &readmask, NULL, NULL, &tv);
  37. if (rc < 0) {
  38. error(1, errno, "select failed");
  39. }
  40. if (rc == 0) {
  41. if (++heartbeats > KEEP_ALIVE_PROBETIMES) {
  42. error(1, 0, "connection dead\n");
  43. }
  44. printf("sending heartbeat #%d\n", heartbeats);
  45. messageObject.type = htonl(MSG_PING);
  46. rc = send(socket_fd, (char *) &messageObject, sizeof(messageObject), 0);
  47. if (rc < 0) {
  48. error(1, errno, "send failure");
  49. }
  50. tv.tv_sec = KEEP_ALIVE_INTERVAL;
  51. continue;
  52. }
  53. if (FD_ISSET(socket_fd, &readmask)) {
  54. n = read(socket_fd, recv_line, MAXLINE);
  55. if (n < 0) {
  56. error(1, errno, "read error");
  57. } else if (n == 0) {
  58. error(1, 0, "server terminated \n");
  59. }
  60. printf("received heartbeat, make heartbeats to 0 \n");
  61. heartbeats = 0;
  62. tv.tv_sec = KEEP_ALIVE_TIME;
  63. }
  64. }
  65. }

服务器端程序设计

#include "lib/common.h"
#include "message_objecte.h"

static int count;

int main(int argc, char **argv) {
    if (argc != 2) {
        error(1, 0, "usage: tcpsever <sleepingtime>");
    }

    int sleepingTime = atoi(argv[1]);

    int listenfd;
    listenfd = socket(AF_INET, SOCK_STREAM, 0);

    struct sockaddr_in server_addr;
    bzero(&server_addr, sizeof(server_addr));
    server_addr.sin_family = AF_INET;
    server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
    server_addr.sin_port = htons(SERV_PORT);

    int rt1 = bind(listenfd, (struct sockaddr *) &server_addr, sizeof(server_addr));
    if (rt1 < 0) {
        error(1, errno, "bind failed ");
    }

    int rt2 = listen(listenfd, LISTENQ);
    if (rt2 < 0) {
        error(1, errno, "listen failed ");
    }

    int connfd;
    struct sockaddr_in client_addr;
    socklen_t client_len = sizeof(client_addr);

    if ((connfd = accept(listenfd, (struct sockaddr *) &client_addr, &client_len)) < 0) {
        error(1, errno, "bind failed ");
    }

    messageObject message;
    count = 0;

    for (;;) {
        int n = read(connfd, (char *) &message, sizeof(messageObject));
        if (n < 0) {
            error(1, errno, "error read");
        } else if (n == 0) {
            error(1, 0, "client closed \n");
        }

        printf("received %d bytes\n", n);
        count++;

        switch (ntohl(message.type)) {
            case MSG_TYPE1 :
                printf("process  MSG_TYPE1 \n");
                break;

            case MSG_TYPE2 :
                printf("process  MSG_TYPE2 \n");
                break;

            case MSG_PING: {
                messageObject pong_message;
                pong_message.type = MSG_PONG;
                sleep(sleepingTime);
                ssize_t rc = send(connfd, (char *) &pong_message, sizeof(pong_message), 0);
                if (rc < 0)
                    error(1, errno, "send failure");
                break;
            }

            default :
                error(1, 0, "unknown message type (%d)\n", ntohl(message.type));
        }

    }

}

实验

第一次实验,服务器端休眠时间为 60 秒。

$./pingclient 127.0.0.1
sending heartbeat #1
sending heartbeat #2
sending heartbeat #3
connection dead


$./pingserver 60
received 1028 bytes
received 1028 bytes

第二次实验,我们让服务器端休眠时间为 5 秒。

$./pingclient 127.0.0.1
sending heartbeat #1
sending heartbeat #2
received heartbeat, make heartbeats to 0
received heartbeat, make heartbeats to 0
sending heartbeat #1
sending heartbeat #2
received heartbeat, make heartbeats to 0
received heartbeat, make heartbeats to 0

$./pingserver 5
received 1028 bytes
received 1028 bytes
received 1028 bytes
received 1028 bytes