18.1 理解线程的概念

18.1.1 引入线程的背景

多进程模型的缺点如下:

  • 创建进程的过程会带来一定的开销
  • 为了完成进程间数据交换,需要特殊的IPC技术

如果是单核的CPU也是可以创建多个进程,只是在使用过程中,需要上处理机下处理机,这样一来非常的不方便,耗费了大量的资源。为了保持多进程的优点并摒弃其缺点,从而引进了多线程的服务。

线程相比进程具有如下优点:

  • 线程的创建和上下文切换比进程的创建和上下文切换更快
  • 线程间交换数据时无需特殊技术

18.1.2 线程和进程的差异

线程是为了解决什么困难登场的?为了得到更多条代码执行流而复制整个内存区域的负担太重了

每个进程的内存空间都由保存全局变量的“数据区”向malloc等函数的动态分配提供空间的堆函数运行时使用的栈构成。如果以获得多个代码执行流为主要目的则不需要完全分离内存结构,只需要分离栈区域

  • 上下文切换时不需要切换数据区和堆
  • 可以利用数据区和堆交换数据

所以说,为了保持这种结构,线程将在进程内创建并运行

  • 进程:在操作系统构成单独执行流的单位
  • 线程:在进程构成单独执行流的单位

18.2 线程创建及运行

下面介绍POSIX下线程的创建方法。

18.2.1 线程的创建和执行流程

  1. //线程具有单独的执行流,因此需要单独定义线程的main函数
  2. #include<pthread.h>
  3. int pthread_create(pthread_t *restrict thread, const pthread_attr_t *restrict attr, void *(* start_routine)(void *), void *restrict arg);
  4. //成功时返回0,失败时返回其他值
  5. //thread 保存新创建线程ID的变量地址值。这一点与线程相同,都需要去进行区分不同线程的ID
  6. //attr 用于传递线程属性的参数,传递NULL时,创建默认属性的线程
  7. //start_routine 相当于线程main函数,在单独执行流中执行的函数地址值(函数指针)
  8. //arg 通过第三个参数传递调用函数时包含传递参数信息的变量地址值
//thread1.c
#include<stdio.h>
#include<pthread.h>
#include<unistd.h>
void *thread_main(void *arg);

int main(int argc, char *argv[])
{
    pthread_t t_id;
    int thread_param=5;

    if(pthread_create(&t_id, NULL, thread_main, (void*)&thread_param)!=0)
    {
        puts("pthread_create() error");
        return -1;
    };
    sleep(10);
    puts("end of main");
    return 0;
}

void *thread_main(void *arg)
{
    int i;
    int cnt=*((int*)arg);
    for(i=0; i<cnt; i++)
    {
        sleep(1);
        puts("running thread");
    }
    return NULL;
}
zhang@zhang-virtual-machine:~/Desktop/Ctest/Demo35$ gcc thread1.c -o thread1 -lpthread # 注意-lpthread
zhang@zhang-virtual-machine:~/Desktop/Ctest/Demo35$ ./thread1
running thread
running thread
running thread
running thread
running thread
end of main

编译程序包括 预编译, 编译,汇编,链接,包含头文件了,仅能说明有了线程函数的声明, 但是还没有实现, 加上-lpthread是在链接阶段,链接这个库。pthread是动态库,需要用-lpthread,所有的动态库都需要用-lxxx来引用用gcc编译使用了POSIX thread的程序时通常需要加额外的选项,以便使用thread-safe的库及头文件,一些老的书里说直接增加链接选项 -lpthread 就可以了。而gcc手册里则指出应该在编译和链接时都增加 -pthread 选项编译选项中指定 -pthread 会附加一个宏定义-D_REENTRANT,该宏会导致 libc 头文件选择那些thread-safe的实现;链接选项中指定 -pthread 则同 -lpthread 一样,只表示链接 POSIX thread 库。由于 libc 用于适应 thread-safe 的宏定义可能变化,因此在编译和链接时都使用 -pthread 选项而不是传统的 -lpthread 能够保持向后兼容,并提高命令行的一致性。目前gcc 4.5.2中已经没有了关于-lpthread的介绍了。所以以后的多线程编译应该用-pthread,而不是-lpthread。(原文连接

如何不通过sleep函数来监控整个线程控制流,标准库中提供了以下函数:

#include<pthread.h>
int pthread_join(pthread_t thread, void **status);
//成功时返回0,失败时返回其他值
//thread 该参数值ID的线程终止后才会从该函数返回
//status 保存线程的main函数返回值的指针变量地址值
//thread2.c
#include<stdio.h>
#include<stdlib.h>
#include<pthread.h>
#include<string.h>
#include<unistd.h>
void *thread_main(void *arg);

int main(int argc, char *argv[])
{
    pthread_t t_id;
    int thread_param=5;
    void *thr_ret;

    if(pthread_create(&t_id, NULL, thread_main, (void*)&thread_param)!=0)
    {
        puts("pthread_create() error");
        return -1;
    };

    if(pthread_join(t_id, &thr_ret)!=0)
    {
        puts("pthread_join() error");
        return -1;
    };

    printf("Thread return message: %s \n", (char *)thr_ret);
    free(thr_ret);
    return 0;
}

void *thread_main(void *arg)
{
    int i;
    int cnt=*((int*)arg);
    char *msg=(char *)malloc(sizeof(char)*50);
    strcpy(msg, "Hello, I'am thread~\n");

    for(i=0; i<cnt; i++)
    {
        sleep(1);
        puts("running thread");
    }
    return (void*)msg;
}

18.2.2 可在临界区内调用的函数

创建一个线程的方法和多个线程的方法一致,关于多个线程同时调用函数时可能会产生问题。这类函数内部存在临界区,也就是说多个线程同时执行这部分代码的时候,可能会引起问题。临界区中至少存在1条这类代码

根据临界区是否引起问题,函数可以分为以下两类

  • 线程安全函数
  • 非线程安全函数

强调一点:线程安全的函数中同样可能存在临界区,只是在线程安全函数中,同时被多个线程调用时可通过一些措施避免问题。线程安全函数的名称后缀通常为_r。比如该函数gethostbynamegethostbyname_r后面的就是安全版本,在编写代码中,我们需要将所有的gethostbyname函数替代为gethostbyname_r。同样可以在声明头文件前定义_REENTRANT宏,做法如下:

-D_REENTRANT # 在编译时直接添加该选项

18.2.3 工作线程模型

以上的介绍都只是单线程,并不是多个线程同时进行。如果把线程交给去工作去完成具体的指令,而main函数主要负责输出。所以我们可以理解为该模型是工作线程模型

//thread3.c
#include<stdio.h>
#include<pthread.h>
void *thread_summation(void *arg);
int sum=0;

int main(int argc, char *argv[])
{
    pthread_t id_t1, id_t2;
    int range1[]={1,5};
    int range2[]={6,10};
    pthread_create(&id_t1, NULL, thread_summation, (void *)range1);
    pthread_create(&id_t2, NULL, thread_summation, (void *)range2);

    pthread_join(id_t1, NULL);
    pthread_join(id_t2, NULL);
    printf("result: %d\n", sum);
    return 0;
}

void *thread_summation(void *arg)
{
    int start=((int*)arg)[0];
    int end=((int*)arg)[1];

    while(start<=end)
    {
        sum+=start;
        start++;
    }
    return NULL;
}

该运行结果虽然正确,但是存在一些临界区的问题,所以在下面展示问题代码:

//thread4.c
#include<stdio.h>
#include<unistd.h>
#include<stdlib.h>
#include<pthread.h>
#define NUM_THREAD 100

void *thread_inc(void *arg);
void *thread_des(void *arg);
long long num=0;

int main(int argc, char *argv[])
{
    pthread_t thread_id[NUM_THREAD];
    int i;

    printf("sizeof long long: %ld \n", sizeof(long long));
    for(i=0; i<NUM_THREAD; i++)
    {
        if(i%2)
            pthread_create(&(thread_id[i]), NULL, thread_inc, NULL);
        else
            pthread_create(&(thread_id[i]), NULL, thread_des, NULL);
    }

    for(i=0;i<NUM_THREAD;i++)
        pthread_join(thread_id[i], NULL);

    printf("result: %lld \n", num);
    return 0;
}

void *thread_inc(void *arg)
{
    int i;
    for(i=0; i<50000000; i++)
        num+=1;
    return NULL;
}

void *thread_des(void *arg)
{
    int i;
    for(i=0; i<50000000; i++)
        num-=1;
    return NULL;
}

该代码的运行结果不唯一,所以出现了问题,只是该问题现在不清楚是什么。

18.3 线程存在的问题和临界区

下面将分析thread4.c中产生问题的原因,并给出解决方案。

18.3.1 多个线程访问同一变量是问题

其中某一个线程访问的时候应该阻止其他线程的访问,这个就是线程的同步

18.3.2 临界区位置

定义:函数内同时运行多个线程时引起问题的多条语句构成的代码块

根据上述thread4.c我们可以观察其中引起问题的函数模块:

void *thread_inc(void *arg)
{
    int i;
    for(i=0; i<50000000; i++)
        num+=1; // 临界区
    return NULL;
}

void *thread_des(void *arg)
{
    int i;
    for(i=0; i<50000000; i++)
        num-=1; // 临界区
    return NULL;
}

产生的问题可以整理为以下三种类型:

  • 2个线程同时执行thread_inc函数
  • 2个线程同时执行thread_des函数
  • 2个线程同时执行thread_inc函数和thread_des函数

所以说2条不同的语句不同线程同时执行时,是有可能构成临界区前提是这2条语句访问同一内存空间

18.4 线程同步

18.4.1 同步的两面性

首先要明确,线程同步是要解决线程访问顺序引发的问题。

  • 同时访问同一内存空间时发生的情况
  • 需要指定访问同一内存空间的线程执行顺序的情况

18.4.2 互斥量

#include<pthread.h>

int pthread_mutex_init(pthread_mutex_t *mutex, const pthread_mutexattr_t *attr);
int pthread_mutex_destroy(pthread_mutex_t *mutex);
//成功时返回0,失败时返回其他值
//mutex 创建互斥量时传递保存互斥量的变量地址值,销毁时传递需要销毁的互斥量地址值
//attr 传递即将创建的互斥量属性,没有特别需要指定的属性时传递NULL
//如果不需要特殊的互斥量属性,则向第二个参数传递NULL时,可以利用PTHREAD_MUTEX_INITIALIZER宏进行如下声明:
//pthread_mutex_t mutex=PTHREAD_MUTEX_INITIALIZER(但是推荐使用init函数进行初始化)
#include<pthread.h>

int pthread_mutex_lock(pthread_mutex_t *mutex);
int pthread_mutex_unlock(pthread_mutex_t *mutex);
//成功时返回0,失败时返回其他值
//其他线程unlock临界区之前,会将当前请求进程一直处于阻塞状态
pthread_mutex_lock(&mutex);
//临界区开始
//.....
//临界区结束
pthread_mutex_unlock(&mutex);
//当线程退出临界区时,如果忘记调用unlock函数,该请求线程就无法摆脱阻塞状态
//这种情况下成为“死锁”
//mutex.c
#include<stdio.h>
#include<unistd.h>
#include<stdlib.h>
#include<pthread.h>
#define NUM_THREAD 100

void *thread_inc(void *arg);
void *thread_des(void *arg);

long long num = 0;
pthread_mutex_t mutex;

int main(int argc, char *argv[])
{
    pthread_t thread_id[NUM_THREAD];
    int i;

    pthread_mutex_init(&mutex, NULL);

    for(i=0;i<NUM_THREAD;i++)
    {
        if(i%2)
            pthread_create(&(thread_id[i]), NULL, thread_inc, NULL);
        else
            pthread_create(&(thread_id[i]), NULL, thread_des, NULL);
    }
    for(i=0;i<NUM_THREAD;i++)
        pthread_join(thread_id[i], NULL);
    printf("result: %lld\n", num);
    pthread_mutex_destroy(&mutex);
    return 0;
}

void *thread_inc(void *arg)
{
    int i;
    pthread_mutex_lock(&mutex);
    for(i=0; i<50000000; i++)
        num+=1;
    pthread_mutex_unlock(&mutex);
    return NULL;
}

void *thread_des(void *arg)
{
    int i;
    for(i=0; i<50000000; i++)
    {
        pthread_mutex_lock(&mutex);
        num-=1;
        pthread_mutex_unlock(&mutex);
    }
    return NULL;
}

由于临界区的划分,上锁解锁会影响时间,所以必须根据合适的需求去划分响应的临界区资源。

18.4.3 信号量

#include<semaphore.h>

int sem_init(sem_t *sem, int pshared, unsigned int value);
int sem_destroy(sem_t *sem);
//成功时返回0,失败时返回其他值
//sem创建信号量时传递保存信号量的变量地址值,销毁时传递需要销毁的信号量变量地址值
//pshared传递其他值时,创建可由多个进程共享的信号量,传递0时,创建只允许1个进程内部使用的信号量,我们需要完成同一进程内的线程同步,所以传递0
//value指定新创建的信号量初始值
#include<semaphore.h>

int sem_post(sem_t *sem);
int sem_wait(sem_t *sem);
//成功时返回0,失败时返回其他值
//sem 传递保存信号量读取值的变量地址值,传递给sem_post时信号量增1,传递给sem_wait时信号量减1
#include<stdio.h>
#include<pthread.h>
#include<semaphore.h>

void *read(void *arg);
void *accu(void *arg);
static sem_t sem_one;
static sem_t sem_two;
static int num;

int main(int argc, char *argv[])
{
    pthread_t id_t1, id_t2;
    sem_init(&sem_one, 0, 0);
    sem_init(&sem_two, 0, 1);

    pthread_create(&id_t1, NULL, read, NULL);
    pthread_create(&id_t2, NULL, accu, NULL);

    pthread_join(id_t1, NULL);
    pthread_join(id_t2, NULL);

    sem_destroy(&sem_one);
    sem_destroy(&sem_two);
    return 0;
}

void *read(void *arg)
{
    int i;
    for(i=0; i<5; i++)
    {
        fputs("Input num:",stdout);

        sem_wait(&sem_two);
        scanf("%d", &num);
        sem_post(&sem_one);
    }
    return NULL;
}

void *accu(void *arg)
{
    int sum=0, i;
    for(i=0; i<5; i++)
    {
        sem_wait(&sem_one);
        sum+=sum;
        sem_post(&sem_two);
    }
    printf("Result: %d\n", sum);
    return NULL;
}

18.5 线程的销毁和多线程并发服务器端的实现

18.5.1 销毁线程的3种方式

  • 调用pthread_join函数
  • 调用pthread_detach函数
#include<pthread.h>

int pthread_detach(pthread_t thread);
//成功时返回0,失败时返回其他值
//thread终止的同时需要销毁的线程ID

18.5.2 多线程并发服务器端的实现

//chat_server.c
//注意临界区代码
#include<stdio.h>
#include<stdlib.h>
#include<string.h>
#include<unistd.h>
#include<arpa/inet.h>
#include<sys/socket.h>
#include<pthread.h>
#include<semaphore.h>
#define BUF_SIZE 100
#define MAX_CLNT 256

void *handle_clnt(void *arg);
void send_msg(char *msg, int len);
void error_handling(char *message);

int clnt_cnt=0;
int clnt_socks[MAX_CLNT];
pthread_mutex_t mutx;

int main(int argc, char *argv[])
{
    int serv_sock, clnt_sock;
    struct sockaddr_in serv_adr, clnt_adr;
    int clnt_adr_sz;
    pthread_t t_id;
    if(argc!=2){
        printf("Usage: %s <port>\n", argv[0]);
        exit(1);
    }

    pthread_mutex_init(&mutx, NULL);
    serv_sock=socket(PF_INET, SOCK_STREAM, 0);

    memset(&serv_adr,0,sizeof(serv_adr));
    serv_adr.sin_family=AF_INET;
    serv_adr.sin_addr.s_addr=htonl(INADDR_ANY);
    serv_adr.sin_port=htons(atoi(argv[1]));

    if(bind(serv_sock, (struct sockaddr*)&serv_adr,sizeof(serv_adr))==-1)
        error_handling("bind() error");
    if(listen(serv_sock, 5)==-1)
        error_handling("listne() error");

    while(1)
    {
        clnt_adr_sz=sizeof(clnt_adr);
        clnt_sock=accept(serv_sock, (struct sockaddr*)&clnt_adr, &clnt_adr_sz);

        pthread_mutex_lock(&mutx);
        clnt_socks[clnt_cnt++]=clnt_sock;
        pthread_mutex_unlock(&mutx);

        pthread_create(&t_id, NULL, handle_clnt, (void*)&clnt_sock);
        pthread_detach(t_id);
        printf("Connected client IP:%s \n", inet_ntoa(clnt_adr.sin_addr));

    }
    close(serv_sock);
    return 0;
}

void *handle_clnt(void *arg)
{
    int clnt_sock=*((int*)arg);
    int str_len=0,i;
    char msg[BUF_SIZE];

    while((str_len=read(clnt_sock, msg, sizeof(msg)))!=0)
        send_msg(msg, str_len);
    pthread_mutex_lock(&mutx);
    for(i=0;i<clnt_cnt;i++)
    {
        if(clnt_sock==clnt_socks[i])
        {
            while(i++<clnt_cnt-1)
                clnt_socks[i]=clnt_socks[i+1];
            break;
        }
    }
    clnt_cnt--;
    pthread_mutex_unlock(&mutx);
    close(clnt_sock);
    return NULL;
}

void send_msg(char *msg, int len)// send to all
{
    int i;
    pthread_mutex_lock(&mutx);
    for(i=0; i<clnt_cnt; i++)
        write(clnt_socks[i], msg, len);
    pthread_mutex_unlock(&mutx);
}

void error_handling(char *message)
{
    fputs(message, stderr);
    fputc('\n', stderr);
    exit(1);
}
zhang@zhang-virtual-machine:~/Desktop/Ctest/Demo36$ gcc chat_server.c -D_REENTRANT -o cserv -pthread
//chat_clnt.c
#include<stdio.h>
#include<stdlib.h>
#include<string.h>
#include<unistd.h>
#include<arpa/inet.h>
#include<sys/socket.h>
#include<pthread.h>
#include<semaphore.h>
#define BUF_SIZE 100
#define NAME_SIZE 20

void *send_msg(void *arg);
void *recv_msg(void *arg);
void error_handling(char *message);

char name[NAME_SIZE]="[DEFAULT]";
char msg[BUF_SIZE];

int main(int argc, char *argv[])
{
    int sock;
    struct sockaddr_in serv_addr;
    pthread_t snd_thread, rcv_thread;
    void *thread_return;
    if(argc!=4){
        printf("Usage:%s<IP> <port> <name>\n",argv[0]);
        exit(1);
    }

    sprintf(name, "[%s]", argv[3]);
    sock=socket(PF_INET, SOCK_STREAM, 0);

    memset(&serv_addr, 0, sizeof(serv_addr));
    serv_addr.sin_family=AF_INET;
    serv_addr.sin_addr.s_addr=inet_addr(argv[1]);
    serv_addr.sin_port=htons(atoi(argv[2]));

    if(connect(sock, (struct sockaddr*)&serv_addr, sizeof(serv_addr))==-1)
        error_handling("connect() error");

    pthread_create(&snd_thread, NULL, send_msg, (void*)&sock);
    pthread_create(&rcv_thread, NULL, recv_msg, (void*)&sock);
    pthread_join(snd_thread, &thread_return);
    pthread_join(rcv_thread, &thread_return);
    close(sock);
    return 0;
}

void *send_msg(void *arg) //send thread main
{
    int sock=*((int*)arg);
    char name_msg[NAME_SIZE+BUF_SIZE];
    while(1)
    {
        fgets(msg, BUF_SIZE, stdin);
        if(!strcmp(msg, "q\n")||!strcmp(msg,"Q\n"))
        {
            close(sock);
            exit(0);
        }
        sprintf(name_msg, "%s %s", name, msg);
        write(sock, name_msg, strlen(name_msg));        
    }
    return NULL;
}

void *recv_msg(void *arg) //read thread main
{
    int sock=*((int*)arg);
    char name_msg[NAME_SIZE+BUF_SIZE];
    int str_len;
    while(1)
    {
        str_len=read(sock, name_msg, NAME_SIZE+BUF_SIZE-1);
        if(str_len==-1)
            return (void*)-1;
        name_msg[str_len]=0;
        fputs(name_msg, stdout);
    }
    return NULL;
}

void error_handling(char *message)
{
    fputs(message, stderr);
    fputc('\n', stderr);
    exit(1);
}
# server
zhang@zhang-virtual-machine:~/Desktop/Ctest/Demo36$ ./cserv 9190
Connected client IP:127.0.0.1 
Connected client IP:127.0.0.1
# client_one
zhang@zhang-virtual-machine:~/Desktop/Ctest/Demo36$ ./cclnt 127.0.0.1 9190 Zhang
nihao
[Zhang] nihao
[JY] sad
oh?
[Zhang] oh?
# client_two
zhang@zhang-virtual-machine:~/Desktop/Ctest/Demo36$ ./cclnt 127.0.0.1 9190 JY
sad
[JY] sad
[Zhang] oh?