线程模型:

线程(thread)是运行在进程中的一个“逻辑流”,现代操作系统都允许在单进程中运行多个线程。线程由操作系统内核管理。每个线程都有自己的上下文(context),包括一个可以唯一标识线程的 ID(thread ID,或者叫 tid)、栈、程序计数器、寄存器等。在同一个进程中,所有的线程共享该进程的整个虚拟地址空间,包括代码、数据、堆、共享库等。

POSIX 线程模型

POSIX 线程是现代 UNIX 系统提供的处理线程的标准接口。POSIX 定义的线程函数大约有 60 多个,这些函数可以帮助我们创建线程、回收线程。接下来我们先看一个简单的例子程序。

  1. int another_shared = 0;
  2. void thread_run(void *arg) {
  3. int *calculator = (int *) arg;
  4. printf("hello, world, tid == %d \n", pthread_self());
  5. for (int i = 0; i < 1000; i++) {
  6. *calculator += 1;
  7. another_shared += 1;
  8. }
  9. }
  10. int main(int c, char **v) {
  11. int calculator;
  12. pthread_t tid1;
  13. pthread_t tid2;
  14. pthread_create(&tid1, NULL, thread_run, &calculator);
  15. pthread_create(&tid2, NULL, thread_run, &calculator);
  16. pthread_join(tid1, NULL);
  17. pthread_join(tid2, NULL);
  18. printf("calculator is %d \n", calculator);
  19. printf("another_shared is %d \n", another_shared);
  20. }
  • pthread_join: 函数等待子线程结束

运行这个程序,很幸运,计算的结果是正确的:

$./thread-helloworld
hello, world, tid == 125607936 
hello, world, tid == 126144512 
calculator is 2000 
another_shared is 2000

主要线程函数

创建线程

int pthread_create(pthread_t *tid, const pthread_attr_t *attr,
           void *(*func)(void *), void *arg);

返回:若成功则为0,若出错则为正的Exxx值
  • tid: 线程 id, 结果参数
  • attr: 线程属性
    • 优先级
    • 守护进程

在新线程的入口函数内,可以执行 pthread_self 函数返回线程 tid:

pthread_t pthread_self(void)

终止线程

当调用这个函数之后,父线程会等待其他所有的子线程终止,之后父线程自己终止:

void pthread_exit(void *status)

也可以通过调用 pthread_cancel 来主动终止一个子线程,和 pthread_exit 不同的是,它可以指定某个子线程终止。

int pthread_cancel(pthread_t tid)

回收已终止线程的资源

当调用 pthread_join 时,主线程会阻塞,直到对应 tid 的子线程自然终止。和 pthread_cancel 不同的是,它不会强迫子线程终止。

int pthread_join(pthread_t tid, void ** thread_return)

分离线程

一个线程的重要属性是可结合的,或者是分离的。一个可结合的线程是能够被其他线程杀死和回收资源的;而一个分离的线程不能被其他线程杀死或回收资源。一般来说,默认的属性是可结合的。

int pthread_detach(pthread_t tid)

在高并发的例子里,每个连接都由一个线程单独处理,在这种情况下,服务器程序并不需要对每个子线程进行终止,这样的话,每个子线程可以在入口函数开始的地方,把自己设置为分离的,这样就能在它终止后自动回收相关的线程资源了,就不需要调用 pthread_join 函数了。
**

每个连接一个线程处理

服务端程序:

#include "lib/common.h"

extern void loop_echo(int);

void thread_run(void *arg) {
    pthread_detach(pthread_self()); // 分离
    int fd = (int) arg;
    loop_echo(fd);
}

int main(int c, char **v) {
    int listener_fd = tcp_server_listen(SERV_PORT);
    pthread_t tid;

    while (1) {
        struct sockaddr_storage ss;
        socklen_t slen = sizeof(ss);
        int fd = accept(listener_fd, (struct sockaddr *) &ss, &slen);
        if (fd < 0) {
            error(1, errno, "accept failed");
        } else {
            pthread_create(&tid, NULL, &thread_run, (void *) fd);
        }
    }

    return 0;
}

这个程序的第 18 行阻塞调用在 accept 上,一旦有新连接建立,阻塞调用返回,调用 pthread_create 创建一个子线程来处理这个连接。

将子线程转变为分离的,也就意味着子线程独自负责线程资源回收
**
loop_echo 的程序:

char rot13_char(char c) {
    if ((c >= 'a' && c <= 'm') || (c >= 'A' && c <= 'M'))
        return c + 13;
    else if ((c >= 'n' && c <= 'z') || (c >= 'N' && c <= 'Z'))
        return c - 13;
    else
        return c;
}

void loop_echo(int fd) {
    char outbuf[MAX_LINE + 1];
    size_t outbuf_used = 0;
    ssize_t result;
    while (1) {
        char ch;
        result = recv(fd, &ch, 1, 0);

        //断开连接或者出错
        if (result == 0) {
            break;
        } else if (result == -1) {
            error(1, errno, "read error");
            break;
        }

        if (outbuf_used < sizeof(outbuf)) {
            outbuf[outbuf_used++] = rot13_char(ch);
        }

        if (ch == '\n') {
            send(fd, outbuf, outbuf_used, 0);
            outbuf_used = 0;
            continue;
        }
    }
}

运行这个程序之后,开启多个 telnet 客户端,可以看到这个服务器程序可以处理多个并发连接并回送数据。单独一个连接退出也不会影响其他连接的数据收发.

$telnet 127.0.0.1 43211
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
aaa
nnn
^]
telnet> quit
Connection closed.

构建线程池处理多个连接

image.png

这个程序的关键是连接字队列的设计,因为这里既有往这个队列里放置描述符的操作,也有从这个队列里取出描述符的操作。

  • mutex
  • condition: 条件变量则是在多个线程需要交互的情况下,用来线程间同步的原语。
//定义一个队列
typedef struct {
    int number;  //队列里的描述字最大个数
    int *fd;     //这是一个数组指针
    int front;   //当前队列的头位置
    int rear;    //当前队列的尾位置
    pthread_mutex_t mutex;  //锁
    pthread_cond_t cond;    //条件变量
} block_queue;

//初始化队列
void block_queue_init(block_queue *blockQueue, int number) {
    blockQueue->number = number;
    blockQueue->fd = calloc(number, sizeof(int));
    blockQueue->front = blockQueue->rear = 0;
    pthread_mutex_init(&blockQueue->mutex, NULL);
    pthread_cond_init(&blockQueue->cond, NULL);
}

//往队列里放置一个描述字fd
void block_queue_push(block_queue *blockQueue, int fd) {
    //一定要先加锁,因为有多个线程需要读写队列
    pthread_mutex_lock(&blockQueue->mutex);
    //将描述字放到队列尾的位置
    blockQueue->fd[blockQueue->rear] = fd;
    //如果已经到最后,重置尾的位置
    if (++blockQueue->rear == blockQueue->number) {
        blockQueue->rear = 0;
    }
    printf("push fd %d", fd);
    //通知其他等待读的线程,有新的连接字等待处理
    pthread_cond_signal(&blockQueue->cond);
    //解锁
    pthread_mutex_unlock(&blockQueue->mutex);
}

//从队列里读出描述字进行处理
int block_queue_pop(block_queue *blockQueue) {
    //加锁
    pthread_mutex_lock(&blockQueue->mutex);
    //判断队列里没有新的连接字可以处理,就一直条件等待,直到有新的连接字入队列
    while (blockQueue->front == blockQueue->rear)
        pthread_cond_wait(&blockQueue->cond, &blockQueue->mutex);
    //取出队列头的连接字
    int fd = blockQueue->fd[blockQueue->front];
    //如果已经到最后,重置头的位置
    if (++blockQueue->front == blockQueue->number) {
        blockQueue->front = 0;
    }
    printf("pop fd %d", fd);
    //解锁
    pthread_mutex_unlock(&blockQueue->mutex);
    //返回连接字
    return fd;
}

服务端程序:

void thread_run(void *arg) {
    pthread_t tid = pthread_self();
    pthread_detach(tid);

    block_queue *blockQueue = (block_queue *) arg;
    while (1) {
        int fd = block_queue_pop(blockQueue);
        printf("get fd in thread, fd==%d, tid == %d", fd, tid);
        loop_echo(fd);
    }
}

int main(int c, char **v) {
    int listener_fd = tcp_server_listen(SERV_PORT);

    block_queue blockQueue;
    block_queue_init(&blockQueue, BLOCK_QUEUE_SIZE);

    thread_array = calloc(THREAD_NUMBER, sizeof(Thread));
    int i;
    for (i = 0; i < THREAD_NUMBER; i++) {
        pthread_create(&(thread_array[i].thread_tid), NULL, &thread_run, (void *) &blockQueue);
    }

    while (1) {
        struct sockaddr_storage ss;
        socklen_t slen = sizeof(ss);
        int fd = accept(listener_fd, (struct sockaddr *) &ss, &slen);
        if (fd < 0) {
            error(1, errno, "accept failed");
        } else {
            block_queue_push(&blockQueue, fd);
        }
    }

    return 0;
}