为社么要有线程?

对于任何一个进程来讲,即便我们没有主动去创建线程,进程也是默认有一个主线程的。线程是负责执行二进制指令的,它会根据项目执行计划书,一行一行执行下去。进程要比线程管的宽多了,除了执行指令之外,内存、文件系统等等都要它来管。

进程相当于一个项目,而线程就是为了完成项目需求,而建立的一个个开发任务。默认情况下,你可以建一个大的任务,就是完成某某功能,然后交给一个人让它从头做到尾,这就是主线程。但是有时候,你发现任务是可以拆解的,如果相关性没有非常大前后关联关系,就可以并行执行。

进程类似于立项成功后的项目组,线程负责执行二进制代码,类似于项目组的开发同学。 为啥是进程内多线程而不是直接多进程呢? 因为进程的创建比较麻烦,要做资源申请、内存独占,同时进程间的通信(数据传递)很麻烦,而进程内线程通信就非常方便,易于控制和协调。

创建线程

举例多线程打印日志,并生成一个一百以内的随机数,作为下载时间返回。

  1. #include <pthread.h>
  2. #include <stdio.h>
  3. #include <stdlib.h>
  4. #define NUM_OF_TASKS 5
  5. void *downloadfile(void *filename)
  6. //线程运行函数,参数是 void 指针类型,用于接收任何类型的参数
  7. {
  8. printf("I am downloading the file %s!\n", (char *)filename);
  9. sleep(10);
  10. long downloadtime = rand()%100;
  11. printf("I finish downloading the file within %d minutes!\n", downloadtime);
  12. pthread_exit((void *)downloadtime);
  13. //将参数转换为(void*)类型,这是线程退出的返回值
  14. }
  15. int main(int argc, char *argv[])
  16. {
  17. char files[NUM_OF_TASKS][20]={"file1.avi","file2.rmvb","file3.mp4","file4.wmv","file5.flv"};
  18. pthread_t threads[NUM_OF_TASKS];
  19. int rc;
  20. int t;
  21. int downloadtime;
  22. pthread_attr_t thread_attr;
  23. //声明一个线程属性并初始化
  24. pthread_attr_init(&thread_attr);
  25. pthread_attr_setdetachstate(&thread_attr,PTHREAD_CREATE_JOINABLE);
  26. //PTH...表示将来主线程等待这个线程结束,并且获取退出时的状态。
  27. for(t=0;t<NUM_OF_TASKS;t++){
  28. printf("creating thread %d, please help me to download %s\n", t, files[t]);
  29. rc = pthread_create(&threads[t], &thread_attr, downloadfile, (void *)files[t]);
  30. //参数一:线程对象;参数二:线程属性;参数三:线程运行函数;参数四:线程运行函数的参数
  31. //主线程就是通过第四个参设将自己的任务分配给子线程
  32. if (rc){
  33. printf("ERROR; return code from pthread_create() is %d\n", rc);
  34. exit(-1);
  35. }
  36. }
  37. pthread_attr_destroy(&thread_attr);
  38. //销毁线程属性
  39. for(t=0;t<NUM_OF_TASKS;t++){
  40. pthread_join(threads[t],(void**)&downloadtime);
  41. //等待线程结束
  42. printf("Thread %d downloads the file %s in %d minutes.\n",t,files[t],downloadtime);
  43. }
  44. pthread_exit(NULL);
  45. //主线程结束
  46. }

任务分配完毕,每个线程下载一个文件,接下来主线程要做的事情就是等待这些子任务完成。当一个线程退出的时候,就会发送信号给其他所有同进程的线程。有一个线程使用 pthread_join 获取这个线程退出的返回值。线程的返回值通过 pthread_join 传给主线程,这样子线程就将自己下载文件所耗费的时间,告诉给主线程。

//多线程程序要依赖于 libpthread.so
gcc download.c -lpthread


# ./a.out
creating thread 0, please help me to download file1.avi
creating thread 1, please help me to download file2.rmvb
I am downloading the file file1.avi!
creating thread 2, please help me to download file3.mp4
I am downloading the file file2.rmvb!
creating thread 3, please help me to download file4.wmv
I am downloading the file file3.mp4!
creating thread 4, please help me to download file5.flv
I am downloading the file file4.wmv!
I am downloading the file file5.flv!
I finish downloading the file within 83 minutes!
I finish downloading the file within 77 minutes!
I finish downloading the file within 86 minutes!
I finish downloading the file within 15 minutes!
I finish downloading the file within 93 minutes!
Thread 0 downloads the file file1.avi in 83 minutes.
Thread 1 downloads the file file2.rmvb in 86 minutes.
Thread 2 downloads the file file3.mp4 in 77 minutes.
Thread 3 downloads the file file4.wmv in 93 minutes.
Thread 4 downloads the file file5.flv in 15 minutes.

e38c28b0972581d009ef16f1ebdee2bd.webp

线程的数据

线程可以将项目并行起来,加快进度,但是也带来的负面影响,过程并行起来了,那数据呢?我们把线程访问的数据细分成三类。下面我们一一来看。
e7b06dcf431f388170ab0a79677ee43f.webp
第一类是线程栈上的本地数据,比如函数执行过程中的局部变量。前面我们说过,函数的调用会使用栈的模型,这在线程里面是一样的。只不过每个线程都有自己的栈空间。栈的大小可以通过命令 ulimit -a 查看,默认情况下线程栈大小为 8192(8MB)。我们可以使用命令 ulimit -s 修改。对于线程栈,可以通过下面这个函数 pthread_attr_t,修改线程栈的大小。

int pthread_attr_setstacksize(pthread_attr_t *attr, size_t stacksize);

主线程在内存中有一个栈空间,其他线程栈也拥有独立的栈空间。为了避免线程之间的栈空间踩踏,线程栈之间还会有小块区域,用来隔离保护各自的栈空间。一旦另一个线程踏入到这个隔离区,就会引发段错误。

第二类数据就是在整个进程里共享的全局数据。例如全局变量,虽然在不同进程中是隔离的,但是在一个进程中是共享的。

第三类数据,线程私有数据(Thread Specific Data),可以通过以下函数创建:

int pthread_key_create(pthread_key_t *key, void (*destructor)(void*))

可以看到,创建一个 key,伴随着一个析构函数。key 一旦被创建,所有线程都可以访问它,但各线程可根据自己的需要往 key 中填入不同的值,这就相当于提供了一个同名而不同值的全局变量。

设置 key 对应的value
int pthread_setspecific(pthread_key_t key, const void *value)
获取 key 对应的 value    
void *pthread_getspecific(pthread_key_t key)

而等到线程退出的时候,就会调用析构函数释放 value。

数据的保护

我们先来看一种方式,Mutex,全称 Mutual Exclusion,中文叫互斥

#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>

#define NUM_OF_TASKS 5

int money_of_tom = 100;
int money_of_jerry = 100;
//第一次运行去掉下面这行
pthread_mutex_t g_money_lock;

void *transfer(void *notused)
{
  pthread_t tid = pthread_self();
  printf("Thread %u is transfering money!\n", (unsigned int)tid);
  //第一次运行去掉下面这行
  pthread_mutex_lock(&g_money_lock);
  sleep(rand()%10);
  money_of_tom+=10;
  sleep(rand()%10);
  money_of_jerry-=10;
  //第一次运行去掉下面这行
  pthread_mutex_unlock(&g_money_lock);
  printf("Thread %u finish transfering money!\n", (unsigned int)tid);
  pthread_exit((void *)0);
}

int main(int argc, char *argv[])
{
  pthread_t threads[NUM_OF_TASKS];
  int rc;
  int t;
  //第一次运行去掉下面这行
  pthread_mutex_init(&g_money_lock, NULL);

  for(t=0;t<NUM_OF_TASKS;t++){
    rc = pthread_create(&threads[t], NULL, transfer, NULL);
    if (rc){
      printf("ERROR; return code from pthread_create() is %d\n", rc);
      exit(-1);
    }
  }

  for(t=0;t<100;t++){
    //第一次运行去掉下面这行
    pthread_mutex_lock(&g_money_lock);
    printf("money_of_tom + money_of_jerry = %d\n", money_of_tom + money_of_jerry);
    //第一次运行去掉下面这行
    pthread_mutex_unlock(&g_money_lock);
  }
  //第一次运行去掉下面这行
  pthread_mutex_destroy(&g_money_lock);
  pthread_exit(NULL);
}

我们来编译一下。
gcc mutex.c -lpthread

第一次,未加锁
[root@deployer createthread]# ./a.out
Thread 508479232 is transfering money!
Thread 491693824 is transfering money!
Thread 500086528 is transfering money!
Thread 483301120 is transfering money!
Thread 516871936 is transfering money!
money_of_tom + money_of_jerry = 200
money_of_tom + money_of_jerry = 200
money_of_tom + money_of_jerry = 220
money_of_tom + money_of_jerry = 220
money_of_tom + money_of_jerry = 230
money_of_tom + money_of_jerry = 240
Thread 483301120 finish transfering money!
money_of_tom + money_of_jerry = 240
Thread 508479232 finish transfering money!
Thread 500086528 finish transfering money!
money_of_tom + money_of_jerry = 220
Thread 516871936 finish transfering money!
money_of_tom + money_of_jerry = 210
money_of_tom + money_of_jerry = 210
Thread 491693824 finish transfering money!
money_of_tom + money_of_jerry = 200
money_of_tom + money_of_jerry = 200

第二次,加锁
[root@deployer createthread]# ./a.out
Thread 568162048 is transfering money!
Thread 576554752 is transfering money!
Thread 551376640 is transfering money!
Thread 542983936 is transfering money!
Thread 559769344 is transfering money!
Thread 568162048 finish transfering money!
Thread 576554752 finish transfering money!
money_of_tom + money_of_jerry = 200
money_of_tom + money_of_jerry = 200
money_of_tom + money_of_jerry = 200
Thread 542983936 finish transfering money!
Thread 559769344 finish transfering money!
money_of_tom + money_of_jerry = 200
money_of_tom + money_of_jerry = 200
Thread 551376640 finish transfering money!
money_of_tom + money_of_jerry = 200
money_of_tom + money_of_jerry = 200
money_of_tom + money_of_jerry = 200
money_of_tom + money_of_jerry = 200

使用 Mutex,首先要使用 pthread_mutex_init 函数初始化这个 mutex,初始化后,就可以用它来保护共享变量了。

pthread_mutex_lock() 就是去抢那把锁的函数,如果抢到了,就可以执行下一行程序,对共享变量进行访问;如果没抢到,就被阻塞在那里等待。如果不想被阻塞,可以使用 pthread_mutex_trylock 去抢那把锁,如果抢到了,就可以执行下一行程序,对共享变量进行访问;如果没抢到,不会被阻塞,而是返回一个错误码。

当共享数据访问结束了,使用 pthread_mutex_unlock 释放锁,让给其他人使用,最终调用 pthread_mutex_destroy 销毁掉这把锁。
0ccf37aafa2b287363399e130b2726be.webp

条件变量

为了使锁的使用能够发挥更高的效率,互斥锁和条件变量配合使用。接下来以三个员工抢任务为例,代码如下:

#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>

#define NUM_OF_TASKS 3
#define MAX_TASK_QUEUE 11

char tasklist[MAX_TASK_QUEUE]="ABCDEFGHIJ";
//首先创建十个任务
int head = 0;
int tail = 0;
/*表示当前分配的工作从哪里开始,到哪里结束。
如果 head 等于 tail,则当前的工作分配完毕;如果 tail 加 N,就是新分配了 N 个工作。*/

int quit = 0;

pthread_mutex_t g_task_lock;
pthread_cond_t g_task_cv;

void *coder(void *notused)
{
  pthread_t tid = pthread_self();

  while(!quit){

    pthread_mutex_lock(&g_task_lock);
    //每一个员工先要获取锁,这样才能保证一个任务只给一个员工
    while(tail == head){    
      if(quit){
        pthread_mutex_unlock(&g_task_lock);
        pthread_exit((void *)0);
      }
      printf("No task now! Thread %u is waiting!\n", (unsigned int)tid);
      pthread_cond_wait(&g_task_cv, &g_task_lock);
      //没有任务,等待并解锁,收到通知后自动抢锁
      printf("Have task now! Thread %u is grabing the task !\n", (unsigned int)tid);
    }
    char task = tasklist[head++];
    //则取出 head 位置代表的任务 task,然后将 head 加一
    pthread_mutex_unlock(&g_task_lock);
    //当这个员工抢到任务后,pthread_mutex_unlock 解锁,让其他员工可以进来抢任务。
    /*第一次只有一个任务的时候:抢到锁的线程释放锁之后,另外两个员工才能从 pthread_cond_wait 中返回,
    然后也会再次通过 while 判断 head 和 tail 是否相同。不过已经晚了,任务都让人家抢走了,
    head 和 tail 又一样了,所以只好再次进入 pthread_cond_wait,接着等任务。*/
    printf("Thread %u has a task %c now!\n", (unsigned int)tid, task);
    sleep(5);
    //开始任务,摸五秒🐟
    printf("Thread %u finish the task %c!\n", (unsigned int)tid, task);
  }

  pthread_exit((void *)0);
}

int main(int argc, char *argv[])
{
  pthread_t threads[NUM_OF_TASKS];
  int rc;
  int t;

  pthread_mutex_init(&g_task_lock, NULL);
  pthread_cond_init(&g_task_cv, NULL);
  //初始化条件变量和锁

  for(t=0;t<NUM_OF_TASKS;t++){
    rc = pthread_create(&threads[t], NULL, coder, NULL);
    if (rc){
      printf("ERROR; return code from pthread_create() is %d\n", rc);
      exit(-1);
    }
  }

  sleep(5);

  for(t=1;t<=4;t++){
    //十个任务分四批分配
    pthread_mutex_lock(&g_task_lock);
    tail+=t;
    printf("I am Boss, I assigned %d tasks, I notify all coders!\n", t);
    pthread_cond_broadcast(&g_task_cv);
    //通知,对应上面的 pthread_cond_wait
    pthread_mutex_unlock(&g_task_lock);
    sleep(20);
  }

  pthread_mutex_lock(&g_task_lock);
  quit = 1;
  pthread_cond_broadcast(&g_task_cv);
  pthread_mutex_unlock(&g_task_lock);

  pthread_mutex_destroy(&g_task_lock);
  pthread_cond_destroy(&g_task_cv);
  pthread_exit(NULL);
}
//招聘三个员工,一开始没有任务,大家睡大觉
No task now! Thread 3491833600 is waiting!
No task now! Thread 3483440896 is waiting!
No task now! Thread 3475048192 is waiting!
//老板开始分配任务了,第一批任务就一个,告诉三个员工醒来抢任务
I am Boss, I assigned 1 tasks, I notify all coders!
//员工一先发现有任务了,开始抢任务
Have task now! Thread 3491833600 is grabing the task !
//员工一抢到了任务A,开始干活
Thread 3491833600 has a task A now! 
//员工二也发现有任务了,开始抢任务,不好意思,就一个任务,让人家抢走了,接着等吧
Have task now! Thread 3483440896 is grabing the task !
No task now! Thread 3483440896 is waiting!
//员工三也发现有任务了,开始抢任务,你比员工二还慢,接着等吧
Have task now! Thread 3475048192 is grabing the task !
No task now! Thread 3475048192 is waiting!
//员工一把任务做完了,又没有任务了,接着等待
Thread 3491833600 finish the task A !
No task now! Thread 3491833600 is waiting!
//老板又有新任务了,这次是两个任务,叫醒他们
I am Boss, I assigned 2 tasks, I notify all coders!
//这次员工二比较积极,先开始抢,并且抢到了任务B
Have task now! Thread 3483440896 is grabing the task !
Thread 3483440896 has a task B now! 
//这次员工三也聪明了,赶紧抢,要不然没有年终奖了,终于抢到了任务C
Have task now! Thread 3475048192 is grabing the task !
Thread 3475048192 has a task C now! 
//员工一上次抢到了,这次抢的慢了,没有抢到,是不是飘了
Have task now! Thread 3491833600 is grabing the task !
No task now! Thread 3491833600 is waiting!
//员工二做完了任务B,没有任务了,接着等待
Thread 3483440896 finish the task B !
No task now! Thread 3483440896 is waiting!
//员工三做完了任务C,没有任务了,接着等待
Thread 3475048192 finish the task C !
No task now! Thread 3475048192 is waiting!
//又来任务了,这次是三个任务,人人有份
I am Boss, I assigned 3 tasks, I notify all coders!
//员工一抢到了任务D,员工二抢到了任务E,员工三抢到了任务F
Have task now! Thread 3491833600 is grabing the task !
Thread 3491833600 has a task D now! 
Have task now! Thread 3483440896 is grabing the task !
Thread 3483440896 has a task E now! 
Have task now! Thread 3475048192 is grabing the task !
Thread 3475048192 has a task F now! 
//三个员工都完成了,然后都又开始等待
Thread 3491833600 finish the task D !
Thread 3483440896 finish the task E !
Thread 3475048192 finish the task F !
No task now! Thread 3491833600 is waiting!
No task now! Thread 3483440896 is waiting!
No task now! Thread 3475048192 is waiting!
//公司活越来越多了,来了四个任务,赶紧干呀
I am Boss, I assigned 4 tasks, I notify all coders!
//员工一抢到了任务G,员工二抢到了任务H,员工三抢到了任务I
Have task now! Thread 3491833600 is grabing the task !
Thread 3491833600 has a task G now! 
Have task now! Thread 3483440896 is grabing the task !
Thread 3483440896 has a task H now! 
Have task now! Thread 3475048192 is grabing the task !
Thread 3475048192 has a task I now! 
//员工一和员工三先做完了,发现还有一个任务开始抢
Thread 3491833600 finish the task G !
Thread 3475048192 finish the task I !
//员工三没抢到,接着等
No task now! Thread 3475048192 is waiting!
//员工一抢到了任务J,多做了一个任务
Thread 3491833600 has a task J now! 
//员工二这才把任务H做完,黄花菜都凉了,接着等待吧
Thread 3483440896 finish the task H !
No task now! Thread 3483440896 is waiting!
//员工一做完了任务J,接着等待
Thread 3491833600 finish the task J !
No task now! Thread 3491833600 is waiting!

1d4e17fdb1860f7ca7f23bbe682d93f7.webp

总结延伸

总结

02a774d7c0f83bb69fec4662622d6d58.webp

  • 线程复制执行二进制指令
  • 多进程缺点: 创建进程占用资源多; 进程间通信需拷贝内存, 不能共享
  • 线程相关操作
    • pthread_exit(A), A 是线程退出的返回值
    • pthread_attr_t 线程属性, 用辅助函数初始化并设置值; 用完需要销毁
    • pthread_create 创建线程, 四个参数(线程对象, 属性, 运行函数, 运行参数)
    • pthread_join 获取线程退出返回值, 多线程依赖 libpthread.so
    • 一个线程退出, 会发送信号给 其他所有同进程的线程
  • 线程中有三类数据
    • 线程栈本地数据, 栈大小默认 8MB; 线程栈之间有保护间隔, 若误入会引发段错误
    • 进程共享的全局数据
    • 线程级别的全局变量(线程私有数据, pthread_key_create(key, destructer)); key 所有线程都可以访问, 可填入各自的值(同名不同值的全局变量)
  • 数据保护
    • Mutex(互斥), 初始化; lock(没抢到则阻塞)/trylock(没抢到则返回错误码); unlock; destroy
    • 条件变量(通知), 收到通知, 还是要抢锁(由 wait 函数执行); 因此条件变量与互斥锁配合使用
    • 互斥锁所谓条件变量的参数, wait 函数会自动解锁/加锁
    • broadcast(通知); destroy

      延伸

pthread_cond_wait:

  1. 等待条件变量满足;
  2. 把获得的锁释放掉;(注意:1,2两步是一个原子操作) 当然如果条件满足了,那么就不需要释放锁。所以释放锁这一步和等待条件满足一定是一起执行(指原子操作)。
  3. pthread_cond_wait()被唤醒时,它解除阻塞,并且尝试获取锁(不一定拿到锁)。因此,一般在使用的时候都是在一个循环里使用pthread_cond_wait()函数,因为它在返回的时候不一定能拿到锁(这可能会发生饿死情形,当然这取决于操作系统的调度策略)。

这里主要是看代码的时候关于锁的状态产生了一些疑惑,搜索后发现:该函数会释得到的放锁之后,恍然大悟。

比较之下,go 的 goroutine 配合 channel 的实现方式显得尤为简洁。