18.1 理解线程的概念
18.1.1 引入线程的背景
多进程模型的缺点如下:
- 创建进程的过程会带来一定的开销
- 为了完成进程间数据交换,需要特殊的IPC技术
如果是单核的CPU也是可以创建多个进程,只是在使用过程中,需要上处理机下处理机,这样一来非常的不方便,耗费了大量的资源。为了保持多进程的优点并摒弃其缺点,从而引进了多线程的服务。
线程相比进程具有如下优点:
- 线程的创建和上下文切换比进程的创建和上下文切换更快
- 线程间交换数据时无需特殊技术
18.1.2 线程和进程的差异
线程是为了解决什么困难登场的?为了得到更多条代码执行流而复制整个内存区域的负担太重了
每个进程的内存空间都由保存全局变量的“数据区”,向malloc等函数的动态分配提供空间的堆,函数运行时使用的栈构成。如果以获得多个代码执行流为主要目的,则不需要完全分离内存结构,只需要分离栈区域。
- 上下文切换时不需要切换数据区和堆
- 可以利用数据区和堆交换数据
所以说,为了保持这种结构,线程将在进程内创建并运行。
- 进程:在操作系统构成单独执行流的单位
- 线程:在进程构成单独执行流的单位
18.2 线程创建及运行
下面介绍POSIX下线程的创建方法。
18.2.1 线程的创建和执行流程
//线程具有单独的执行流,因此需要单独定义线程的main函数
#include<pthread.h>
int pthread_create(pthread_t *restrict thread, const pthread_attr_t *restrict attr, void *(* start_routine)(void *), void *restrict arg);
//成功时返回0,失败时返回其他值
//thread 保存新创建线程ID的变量地址值。这一点与线程相同,都需要去进行区分不同线程的ID
//attr 用于传递线程属性的参数,传递NULL时,创建默认属性的线程
//start_routine 相当于线程main函数,在单独执行流中执行的函数地址值(函数指针)
//arg 通过第三个参数传递调用函数时包含传递参数信息的变量地址值
//thread1.c
#include<stdio.h>
#include<pthread.h>
#include<unistd.h>
void *thread_main(void *arg);
int main(int argc, char *argv[])
{
pthread_t t_id;
int thread_param=5;
if(pthread_create(&t_id, NULL, thread_main, (void*)&thread_param)!=0)
{
puts("pthread_create() error");
return -1;
};
sleep(10);
puts("end of main");
return 0;
}
void *thread_main(void *arg)
{
int i;
int cnt=*((int*)arg);
for(i=0; i<cnt; i++)
{
sleep(1);
puts("running thread");
}
return NULL;
}
zhang@zhang-virtual-machine:~/Desktop/Ctest/Demo35$ gcc thread1.c -o thread1 -lpthread # 注意-lpthread
zhang@zhang-virtual-machine:~/Desktop/Ctest/Demo35$ ./thread1
running thread
running thread
running thread
running thread
running thread
end of main
编译程序包括 预编译, 编译,汇编,链接,包含头文件了,仅能说明有了线程函数的声明, 但是还没有实现, 加上-lpthread是在链接阶段,链接这个库。pthread是动态库,需要用-lpthread,所有的动态库都需要用-lxxx来引用用gcc编译使用了POSIX thread的程序时通常需要加额外的选项,以便使用thread-safe的库及头文件,一些老的书里说直接增加链接选项 -lpthread 就可以了。而gcc手册里则指出应该在编译和链接时都增加 -pthread 选项编译选项中指定 -pthread 会附加一个宏定义-D_REENTRANT,该宏会导致 libc 头文件选择那些thread-safe的实现;链接选项中指定 -pthread 则同 -lpthread 一样,只表示链接 POSIX thread 库。由于 libc 用于适应 thread-safe 的宏定义可能变化,因此在编译和链接时都使用 -pthread 选项而不是传统的 -lpthread 能够保持向后兼容,并提高命令行的一致性。目前gcc 4.5.2中已经没有了关于-lpthread的介绍了。所以以后的多线程编译应该用-pthread,而不是-lpthread。(原文连接)
如何不通过sleep函数来监控整个线程控制流,标准库中提供了以下函数:
#include<pthread.h>
int pthread_join(pthread_t thread, void **status);
//成功时返回0,失败时返回其他值
//thread 该参数值ID的线程终止后才会从该函数返回
//status 保存线程的main函数返回值的指针变量地址值
//thread2.c
#include<stdio.h>
#include<stdlib.h>
#include<pthread.h>
#include<string.h>
#include<unistd.h>
void *thread_main(void *arg);
int main(int argc, char *argv[])
{
pthread_t t_id;
int thread_param=5;
void *thr_ret;
if(pthread_create(&t_id, NULL, thread_main, (void*)&thread_param)!=0)
{
puts("pthread_create() error");
return -1;
};
if(pthread_join(t_id, &thr_ret)!=0)
{
puts("pthread_join() error");
return -1;
};
printf("Thread return message: %s \n", (char *)thr_ret);
free(thr_ret);
return 0;
}
void *thread_main(void *arg)
{
int i;
int cnt=*((int*)arg);
char *msg=(char *)malloc(sizeof(char)*50);
strcpy(msg, "Hello, I'am thread~\n");
for(i=0; i<cnt; i++)
{
sleep(1);
puts("running thread");
}
return (void*)msg;
}
18.2.2 可在临界区内调用的函数
创建一个线程的方法和多个线程的方法一致,关于多个线程同时调用函数时可能会产生问题。这类函数内部存在临界区,也就是说多个线程同时执行这部分代码的时候,可能会引起问题。临界区中至少存在1条这类代码。
根据临界区是否引起问题,函数可以分为以下两类:
- 线程安全函数
- 非线程安全函数
强调一点:线程安全的函数中同样可能存在临界区,只是在线程安全函数中,同时被多个线程调用时可通过一些措施避免问题。线程安全函数的名称后缀通常为_r。比如该函数gethostbyname和gethostbyname_r后面的就是安全版本,在编写代码中,我们需要将所有的gethostbyname函数替代为gethostbyname_r。同样可以在声明头文件前定义_REENTRANT宏,做法如下:
-D_REENTRANT # 在编译时直接添加该选项
18.2.3 工作线程模型
以上的介绍都只是单线程,并不是多个线程同时进行。如果把线程交给去工作去完成具体的指令,而main函数主要负责输出。所以我们可以理解为该模型是工作线程模型。
//thread3.c
#include<stdio.h>
#include<pthread.h>
void *thread_summation(void *arg);
int sum=0;
int main(int argc, char *argv[])
{
pthread_t id_t1, id_t2;
int range1[]={1,5};
int range2[]={6,10};
pthread_create(&id_t1, NULL, thread_summation, (void *)range1);
pthread_create(&id_t2, NULL, thread_summation, (void *)range2);
pthread_join(id_t1, NULL);
pthread_join(id_t2, NULL);
printf("result: %d\n", sum);
return 0;
}
void *thread_summation(void *arg)
{
int start=((int*)arg)[0];
int end=((int*)arg)[1];
while(start<=end)
{
sum+=start;
start++;
}
return NULL;
}
该运行结果虽然正确,但是存在一些临界区的问题,所以在下面展示问题代码:
//thread4.c
#include<stdio.h>
#include<unistd.h>
#include<stdlib.h>
#include<pthread.h>
#define NUM_THREAD 100
void *thread_inc(void *arg);
void *thread_des(void *arg);
long long num=0;
int main(int argc, char *argv[])
{
pthread_t thread_id[NUM_THREAD];
int i;
printf("sizeof long long: %ld \n", sizeof(long long));
for(i=0; i<NUM_THREAD; i++)
{
if(i%2)
pthread_create(&(thread_id[i]), NULL, thread_inc, NULL);
else
pthread_create(&(thread_id[i]), NULL, thread_des, NULL);
}
for(i=0;i<NUM_THREAD;i++)
pthread_join(thread_id[i], NULL);
printf("result: %lld \n", num);
return 0;
}
void *thread_inc(void *arg)
{
int i;
for(i=0; i<50000000; i++)
num+=1;
return NULL;
}
void *thread_des(void *arg)
{
int i;
for(i=0; i<50000000; i++)
num-=1;
return NULL;
}
该代码的运行结果不唯一,所以出现了问题,只是该问题现在不清楚是什么。
18.3 线程存在的问题和临界区
下面将分析thread4.c中产生问题的原因,并给出解决方案。
18.3.1 多个线程访问同一变量是问题
其中某一个线程访问的时候应该阻止其他线程的访问,这个就是线程的同步。
18.3.2 临界区位置
定义:函数内同时运行多个线程时引起问题的多条语句构成的代码块
根据上述thread4.c我们可以观察其中引起问题的函数模块:
void *thread_inc(void *arg)
{
int i;
for(i=0; i<50000000; i++)
num+=1; // 临界区
return NULL;
}
void *thread_des(void *arg)
{
int i;
for(i=0; i<50000000; i++)
num-=1; // 临界区
return NULL;
}
产生的问题可以整理为以下三种类型:
- 2个线程同时执行thread_inc函数
- 2个线程同时执行thread_des函数
- 2个线程同时执行thread_inc函数和thread_des函数
所以说2条不同的语句由不同线程同时执行时,是有可能构成临界区。前提是这2条语句访问同一内存空间。
18.4 线程同步
18.4.1 同步的两面性
首先要明确,线程同步是要解决线程访问顺序引发的问题。
- 同时访问同一内存空间时发生的情况
- 需要指定访问同一内存空间的线程执行顺序的情况
18.4.2 互斥量
#include<pthread.h>
int pthread_mutex_init(pthread_mutex_t *mutex, const pthread_mutexattr_t *attr);
int pthread_mutex_destroy(pthread_mutex_t *mutex);
//成功时返回0,失败时返回其他值
//mutex 创建互斥量时传递保存互斥量的变量地址值,销毁时传递需要销毁的互斥量地址值
//attr 传递即将创建的互斥量属性,没有特别需要指定的属性时传递NULL
//如果不需要特殊的互斥量属性,则向第二个参数传递NULL时,可以利用PTHREAD_MUTEX_INITIALIZER宏进行如下声明:
//pthread_mutex_t mutex=PTHREAD_MUTEX_INITIALIZER(但是推荐使用init函数进行初始化)
#include<pthread.h>
int pthread_mutex_lock(pthread_mutex_t *mutex);
int pthread_mutex_unlock(pthread_mutex_t *mutex);
//成功时返回0,失败时返回其他值
//其他线程unlock临界区之前,会将当前请求进程一直处于阻塞状态
pthread_mutex_lock(&mutex);
//临界区开始
//.....
//临界区结束
pthread_mutex_unlock(&mutex);
//当线程退出临界区时,如果忘记调用unlock函数,该请求线程就无法摆脱阻塞状态
//这种情况下成为“死锁”
//mutex.c
#include<stdio.h>
#include<unistd.h>
#include<stdlib.h>
#include<pthread.h>
#define NUM_THREAD 100
void *thread_inc(void *arg);
void *thread_des(void *arg);
long long num = 0;
pthread_mutex_t mutex;
int main(int argc, char *argv[])
{
pthread_t thread_id[NUM_THREAD];
int i;
pthread_mutex_init(&mutex, NULL);
for(i=0;i<NUM_THREAD;i++)
{
if(i%2)
pthread_create(&(thread_id[i]), NULL, thread_inc, NULL);
else
pthread_create(&(thread_id[i]), NULL, thread_des, NULL);
}
for(i=0;i<NUM_THREAD;i++)
pthread_join(thread_id[i], NULL);
printf("result: %lld\n", num);
pthread_mutex_destroy(&mutex);
return 0;
}
void *thread_inc(void *arg)
{
int i;
pthread_mutex_lock(&mutex);
for(i=0; i<50000000; i++)
num+=1;
pthread_mutex_unlock(&mutex);
return NULL;
}
void *thread_des(void *arg)
{
int i;
for(i=0; i<50000000; i++)
{
pthread_mutex_lock(&mutex);
num-=1;
pthread_mutex_unlock(&mutex);
}
return NULL;
}
由于临界区的划分,上锁解锁会影响时间,所以必须根据合适的需求去划分响应的临界区资源。
18.4.3 信号量
#include<semaphore.h>
int sem_init(sem_t *sem, int pshared, unsigned int value);
int sem_destroy(sem_t *sem);
//成功时返回0,失败时返回其他值
//sem创建信号量时传递保存信号量的变量地址值,销毁时传递需要销毁的信号量变量地址值
//pshared传递其他值时,创建可由多个进程共享的信号量,传递0时,创建只允许1个进程内部使用的信号量,我们需要完成同一进程内的线程同步,所以传递0
//value指定新创建的信号量初始值
#include<semaphore.h>
int sem_post(sem_t *sem);
int sem_wait(sem_t *sem);
//成功时返回0,失败时返回其他值
//sem 传递保存信号量读取值的变量地址值,传递给sem_post时信号量增1,传递给sem_wait时信号量减1
#include<stdio.h>
#include<pthread.h>
#include<semaphore.h>
void *read(void *arg);
void *accu(void *arg);
static sem_t sem_one;
static sem_t sem_two;
static int num;
int main(int argc, char *argv[])
{
pthread_t id_t1, id_t2;
sem_init(&sem_one, 0, 0);
sem_init(&sem_two, 0, 1);
pthread_create(&id_t1, NULL, read, NULL);
pthread_create(&id_t2, NULL, accu, NULL);
pthread_join(id_t1, NULL);
pthread_join(id_t2, NULL);
sem_destroy(&sem_one);
sem_destroy(&sem_two);
return 0;
}
void *read(void *arg)
{
int i;
for(i=0; i<5; i++)
{
fputs("Input num:",stdout);
sem_wait(&sem_two);
scanf("%d", &num);
sem_post(&sem_one);
}
return NULL;
}
void *accu(void *arg)
{
int sum=0, i;
for(i=0; i<5; i++)
{
sem_wait(&sem_one);
sum+=sum;
sem_post(&sem_two);
}
printf("Result: %d\n", sum);
return NULL;
}
18.5 线程的销毁和多线程并发服务器端的实现
18.5.1 销毁线程的3种方式
- 调用pthread_join函数
- 调用pthread_detach函数
#include<pthread.h>
int pthread_detach(pthread_t thread);
//成功时返回0,失败时返回其他值
//thread终止的同时需要销毁的线程ID
18.5.2 多线程并发服务器端的实现
//chat_server.c
//注意临界区代码
#include<stdio.h>
#include<stdlib.h>
#include<string.h>
#include<unistd.h>
#include<arpa/inet.h>
#include<sys/socket.h>
#include<pthread.h>
#include<semaphore.h>
#define BUF_SIZE 100
#define MAX_CLNT 256
void *handle_clnt(void *arg);
void send_msg(char *msg, int len);
void error_handling(char *message);
int clnt_cnt=0;
int clnt_socks[MAX_CLNT];
pthread_mutex_t mutx;
int main(int argc, char *argv[])
{
int serv_sock, clnt_sock;
struct sockaddr_in serv_adr, clnt_adr;
int clnt_adr_sz;
pthread_t t_id;
if(argc!=2){
printf("Usage: %s <port>\n", argv[0]);
exit(1);
}
pthread_mutex_init(&mutx, NULL);
serv_sock=socket(PF_INET, SOCK_STREAM, 0);
memset(&serv_adr,0,sizeof(serv_adr));
serv_adr.sin_family=AF_INET;
serv_adr.sin_addr.s_addr=htonl(INADDR_ANY);
serv_adr.sin_port=htons(atoi(argv[1]));
if(bind(serv_sock, (struct sockaddr*)&serv_adr,sizeof(serv_adr))==-1)
error_handling("bind() error");
if(listen(serv_sock, 5)==-1)
error_handling("listne() error");
while(1)
{
clnt_adr_sz=sizeof(clnt_adr);
clnt_sock=accept(serv_sock, (struct sockaddr*)&clnt_adr, &clnt_adr_sz);
pthread_mutex_lock(&mutx);
clnt_socks[clnt_cnt++]=clnt_sock;
pthread_mutex_unlock(&mutx);
pthread_create(&t_id, NULL, handle_clnt, (void*)&clnt_sock);
pthread_detach(t_id);
printf("Connected client IP:%s \n", inet_ntoa(clnt_adr.sin_addr));
}
close(serv_sock);
return 0;
}
void *handle_clnt(void *arg)
{
int clnt_sock=*((int*)arg);
int str_len=0,i;
char msg[BUF_SIZE];
while((str_len=read(clnt_sock, msg, sizeof(msg)))!=0)
send_msg(msg, str_len);
pthread_mutex_lock(&mutx);
for(i=0;i<clnt_cnt;i++)
{
if(clnt_sock==clnt_socks[i])
{
while(i++<clnt_cnt-1)
clnt_socks[i]=clnt_socks[i+1];
break;
}
}
clnt_cnt--;
pthread_mutex_unlock(&mutx);
close(clnt_sock);
return NULL;
}
void send_msg(char *msg, int len)// send to all
{
int i;
pthread_mutex_lock(&mutx);
for(i=0; i<clnt_cnt; i++)
write(clnt_socks[i], msg, len);
pthread_mutex_unlock(&mutx);
}
void error_handling(char *message)
{
fputs(message, stderr);
fputc('\n', stderr);
exit(1);
}
zhang@zhang-virtual-machine:~/Desktop/Ctest/Demo36$ gcc chat_server.c -D_REENTRANT -o cserv -pthread
//chat_clnt.c
#include<stdio.h>
#include<stdlib.h>
#include<string.h>
#include<unistd.h>
#include<arpa/inet.h>
#include<sys/socket.h>
#include<pthread.h>
#include<semaphore.h>
#define BUF_SIZE 100
#define NAME_SIZE 20
void *send_msg(void *arg);
void *recv_msg(void *arg);
void error_handling(char *message);
char name[NAME_SIZE]="[DEFAULT]";
char msg[BUF_SIZE];
int main(int argc, char *argv[])
{
int sock;
struct sockaddr_in serv_addr;
pthread_t snd_thread, rcv_thread;
void *thread_return;
if(argc!=4){
printf("Usage:%s<IP> <port> <name>\n",argv[0]);
exit(1);
}
sprintf(name, "[%s]", argv[3]);
sock=socket(PF_INET, SOCK_STREAM, 0);
memset(&serv_addr, 0, sizeof(serv_addr));
serv_addr.sin_family=AF_INET;
serv_addr.sin_addr.s_addr=inet_addr(argv[1]);
serv_addr.sin_port=htons(atoi(argv[2]));
if(connect(sock, (struct sockaddr*)&serv_addr, sizeof(serv_addr))==-1)
error_handling("connect() error");
pthread_create(&snd_thread, NULL, send_msg, (void*)&sock);
pthread_create(&rcv_thread, NULL, recv_msg, (void*)&sock);
pthread_join(snd_thread, &thread_return);
pthread_join(rcv_thread, &thread_return);
close(sock);
return 0;
}
void *send_msg(void *arg) //send thread main
{
int sock=*((int*)arg);
char name_msg[NAME_SIZE+BUF_SIZE];
while(1)
{
fgets(msg, BUF_SIZE, stdin);
if(!strcmp(msg, "q\n")||!strcmp(msg,"Q\n"))
{
close(sock);
exit(0);
}
sprintf(name_msg, "%s %s", name, msg);
write(sock, name_msg, strlen(name_msg));
}
return NULL;
}
void *recv_msg(void *arg) //read thread main
{
int sock=*((int*)arg);
char name_msg[NAME_SIZE+BUF_SIZE];
int str_len;
while(1)
{
str_len=read(sock, name_msg, NAME_SIZE+BUF_SIZE-1);
if(str_len==-1)
return (void*)-1;
name_msg[str_len]=0;
fputs(name_msg, stdout);
}
return NULL;
}
void error_handling(char *message)
{
fputs(message, stderr);
fputc('\n', stderr);
exit(1);
}
# server
zhang@zhang-virtual-machine:~/Desktop/Ctest/Demo36$ ./cserv 9190
Connected client IP:127.0.0.1
Connected client IP:127.0.0.1
# client_one
zhang@zhang-virtual-machine:~/Desktop/Ctest/Demo36$ ./cclnt 127.0.0.1 9190 Zhang
nihao
[Zhang] nihao
[JY] sad
oh?
[Zhang] oh?
# client_two
zhang@zhang-virtual-machine:~/Desktop/Ctest/Demo36$ ./cclnt 127.0.0.1 9190 JY
sad
[JY] sad
[Zhang] oh?