线程

什么是线程

线程(thread)是操作系统能够进行「运算调度」的最小单位,其是进程中的一个执行任务(控制单元),负责当前进程中程序的执行。

一个进程至少有一个线程,一个进程可以运行多个线程,这些线程共享同一块内存,线程之间可以共享对象、资源,如果有冲突或需要协同,还可以随时沟通以解决冲突或保持同步

进程仅仅是在内存中开辟一块空间(提供线程工作所需的资源),线程真正被CPU执行,线程需要的资源跟所在进程的要

为什么要用线程

开设线程的消耗远远小于进程,开一个新的进程需要先申请新的内存空间,而一个进程可以开设多个线程,无需再另外申请空间,并且一个进程内的多个线程数据是共享的

线程与进程区别

  • 本质区别:进程是操作系统资源分配的基本单位,而线程是任务调度和执行的基本单位
  • 在开销方面:每个进程都有独立的代码和数据空间(程序上下文),程序之间的切换会有较大的开销;线程可以看做轻量级的进程,同一类线程共享代码和数据空间,每个线程都有自己独立的运行栈和程序计数器(PC),线程之间切换的开销小
  • 「所处环境」:在操作系统中能同时运行多个进程(程序);而在同一个进程(程序)中有多个线程同时执行(通过CPU调度,在每个时间片中只有一个线程执行)
  • 内存分配方面:系统在运行的时候会为每个进程分配不同的内存空间;而对线程而言,除了CPU外,系统不会为线程分配内存(线程所使用的资源来自其所属进程的资源),线程组之间只能共享资源
  • 包含关系:没有线程的进程可以看做是单线程的,如果一个进程内有多个线程,则执行过程不是一条线的,而是多条线(线程)共同完成的;线程是进程的一部分,所以线程也被称为轻权进程或者轻量级进程

举个例子:进程=火车,线程=车厢

  • 线程在进程下行进(单纯的车厢无法运行)
  • 一个进程可以包含多个线程(一辆火车可以有多个车厢)
  • 不同进程间数据很难共享(一辆火车上的乘客很难换到另外一辆火车,比如站点换乘)
  • 同一进程下不同线程间数据很易共享(A车厢换到B车厢很容易)
  • 进程要比线程消耗更多的计算机资源(采用多列火车相比多个车厢更耗资源)

注意:进程是资源分配的最小单位,线程是CPU调度的最小单位,每一个进程中至少有一个线程。

python创建线程

方式一

  1. # 利用函数
  2. from threading import Thread
  3. import time
  4. def task(name):
  5. print(f"{name} is running")
  6. time.sleep(3)
  7. print(f'{name} is over')
  8. # 创建线程无需在__main__下面编写 但是为了统一 还是习惯在子代码中写
  9. t = Thread(target=task, args=('kevin',))
  10. t.start() # 创建线程的开销极小 几乎是一瞬间就可以创建
  11. print('主线程')
  12. # kevin is running
  13. # 主线程
  14. # kevin is over

方式二

  1. # 利用类
  2. from threading import Thread
  3. import time
  4. class MyThread(Thread):
  5. def __init__(self, username):
  6. super().__init__()
  7. self.username = username
  8. def run(self):
  9. print(f'{self.username} is running')
  10. time.sleep(3)
  11. print(f'{self.username} is over')
  12. t = MyThread('kevin')
  13. t.start()d
  14. print('主线程')
  15. # kevin is running
  16. # 主线程
  17. # kevin is over

线程实现TCP并发

服务端

  1. from threading import Thread
  2. import socket
  3. HOST = '127.0.0.1'
  4. PORT = 8080
  5. server = socket.socket()
  6. server.bind((HOST, PORT))
  7. server.listen()
  8. def talk(sock):
  9. while True:
  10. data = sock.recv(1024)
  11. print(data.decode('utf8'))
  12. sock.send(data.upper())
  13. while True:
  14. sock, addr = server.accept()
  15. t = Thread(target=talk, args=(sock,))
  16. t.start()

客户端

  1. import socket
  2. client = socket.socket()
  3. client.connect(('127.0.0.1', 8080))
  4. while True:
  5. client.send(b'hello world')
  6. data = client.recv(1024)
  7. print(data.decode('utf8'))

线程对象属性和方法

join方法

让主线程等待子线程运行完毕再执行

  1. from threading import Thread
  2. import time
  3. def task(name):
  4. print(f'{name} is running')
  5. time.sleep(3)
  6. print(f'{name} is over')
  7. t = Thread(target=task, args=('kevin',))
  8. t.start()
  9. t.join() # 主线程代码等待子线程代码运行完毕之后再往下执行
  10. print('主线程')
  11. # kevin is running
  12. # kevin is over
  13. # 主线程

统计活跃的线程数

active_count()

  1. from threading import Thread,active_count
  2. import os
  3. import time
  4. def task():
  5. time.sleep(3)
  6. print('子线程获取进程号>>>:',os.getpid())
  7. if __name__ == '__main__':
  8. t = Thread(target=task)
  9. t.start()
  10. print(active_count())
  11. print('主线程获取进程号>>>:',os.getpid())

获取线程的名字

current_thread().name

  1. from threading import Thread,active_count,current_thread
  2. import os
  3. import time
  4. def task():
  5. time.sleep(3)
  6. print('子线程获取进程号>>>:',os.getpid())
  7. print(current_thread().name)
  8. if __name__ == '__main__':
  9. t = Thread(target=task)
  10. t.start()
  11. t.join()
  12. print(current_thread().name)
  13. print('主线程获取进程号>>>:',os.getpid())

通过类来获取名字

  1. from threading import Thread,active_count,current_thread
  2. import os
  3. import time
  4. class MyThread(Thread):
  5. def run(self):
  6. print(self.name)
  7. if __name__ == '__main__':
  8. t1 = MyThread()
  9. t2 = MyThread()
  10. t1.start()
  11. t2.start()
  12. # Thread-1
  13. # Thread-2

同一个进程内多个线程数据共享

  1. from threading import Thread
  2. money = 999
  3. def task():
  4. global money
  5. money = 666
  6. t = Thread(target=task)
  7. t.start()
  8. t.join()
  9. print(money)
  10. # 666

守护进程

  1. from threading import Thread
  2. import time
  3. def task(name):
  4. print(f'{name} is running')
  5. time.sleep(3)
  6. print(f'{name} is over')
  7. t = Thread(target=task, args=('kevin',))
  8. t.daemon = True
  9. t.start()
  10. print('主线程')
  11. # kevin is running
  12. # 主线程

计算密集型与IO密集型程序

计算密集型

  1. 计算密集型任务的特点是要进行大量的计算,消耗CPU资源,比如计算圆周率、对视频进行高清解码等等,全靠CPU的运算能力。这种计算密集型任务虽然也可以用多任务完成,但是任务越多,花在任务切换的时间就越多,CPU执行任务的效率就越低,所以,要最高效地利用CPU,计算密集型任务同时进行的数量应当等于CPU的核心数。直言比喻 比如造原子弹 需要大量计算这种

IO密集型

大部分的程序在运行时,都需要大量IO操作,比如网络数据的收发,大文件的读写,这样的程序称为IO密集型程序。

IO密集型程序在运行时,需要大量的时间进行等待,如果IO操作不完成,程序无法执行后面的操作,一直处于等待状态,导致CPU空闲。

由于GIL的存在,同一时刻只能有一个线程执行,在程序进行IO操作时,CPU实际并没有做任何工作,程序执行效率非常低。

为了提高CPU的使用率,Python解释在程序执行IO等待时,会释放GIL锁,让其它线程执行,提高Python程序的执行效率。

所以,GIL对于IO密集型的影响很小,多线程适合用来做IO密集型的程序。

GIL全局解释器锁

官方文档

  1. In CPython, the global interpreter lock, or GIL, is a mutex that prevents multiple native threads from executing Python bytecodes at once. This lock is necessary mainly<br />because CPythons memory management is not thread-safe. (However, since the GIL exists, other features have grown to depend on the guarantees that it enforces.)

理解

  1. GIL只存在于CPython解释器中,不是python的特征,GIL是一把互斥锁,用于阻止同一个进程下的多个线程同时执行。原因是因为CPython解释器中的垃圾回收机制不是线程安全的。
  2. 反向验证GIL的存在如果不存在会产生垃圾回收机制与正常线程之间数据错乱,GIL是加在CPython解释器上面的互斥锁,同一个进程下的多个线程要想执行必须先抢GIL锁,所以同一个进程下多个线程肯定不能同时运行,即无法利用多核优势。但是多个任务都是IO密集型的那么多线程优势就体现出来了(消耗的资源更少)

GIL和互斥锁的区别

验证GIL的存在

  1. from threading import Thread, Lock
  2. import time
  3. money = 100
  4. def task():
  5. global money
  6. # time.sleep(0.1)
  7. money -= 1
  8. for i in range(100): # 创建100个线程
  9. t = Thread(target=task)
  10. t.start()
  11. print(money)
  12. # 0

同一个进程下的多个线程虽然有GIL的存在不会出现并行的效果,但是如果线程内有IO操作还是会造成数据的错乱,这个时候需要额外的添加互斥锁

  1. from threading import Thread, Lock
  2. import time
  3. money = 100
  4. mutex = Lock()
  5. def task():
  6. global money
  7. mutex.acquire()
  8. tmp = money
  9. time.sleep(0.1)
  10. money = tmp - 1
  11. mutex.release()
  12. # 抢锁放锁也有简便写法(with上下文管理)
  13. t_list = []
  14. for i in range(100):
  15. t = Thread(target=task)
  16. t.start()
  17. t_list.append(t)
  18. for t in t_list:
  19. t.join()
  20. # 应该等待所有的线程运行完毕再打印money
  21. print(money)
  22. # 100

验证多线程的作用

计算密集型

  1. 计算密集型任务的特点是要进行大量的计算,消耗CPU资源,比如计算圆周率、对视频进行高清解码等等,全靠CPU的运算能力。这种计算密集型任务虽然也可以用多任务完成,但是任务越多,花在任务切换的时间就越多,CPU执行任务的效率就越低,所以,要最高效地利用CPU,计算密集型任务同时进行的数量应当等于CPU的核心数。

多进程

  1. from multiprocessing import Process
  2. import os
  3. import time
  4. def task():
  5. res = 1
  6. for i in range(1, 10000):
  7. res *= i
  8. if __name__ == '__main__':
  9. print(os.cpu_count()) # 查看当前计算机CPU个数
  10. start_time = time.time()
  11. p_list = []
  12. for i in range(10):
  13. p = Process(target=task)
  14. p.start()
  15. p_list.append(p)
  16. for p in p_list:
  17. p.join()
  18. print('计算密集型,多进程执行总耗时:%s' % (time.time() - start_time))
  19. # 计算密集型,多进程执行总耗时:0.07307910919189453

多线程

  1. from threading import Thread
  2. import time
  3. def task():
  4. res = 1
  5. for i in range(1, 10000):
  6. res *= i
  7. start_time = time.time()
  8. t_list = []
  9. for i in range(10):
  10. t = Thread(target=task)
  11. t.start()
  12. t_list.append(t)
  13. for t in t_list:
  14. t.join()
  15. print('计算密集型,多线程执行总耗时:%s' % (time.time() - start_time))
  16. # 计算密集型,多线程执行总耗时:0.29865002632141113

结论

经过两者对比可以看出,在计算密集型多进程多线程更有优势

IO密集型

  1. 大部分的程序在运行时,都需要大量IO操作,比如网络数据的收发,大文件的读写,这样的程序称为IO密集型程序。IO密集型程序在运行时,需要大量的时间进行等待,如果IO操作不完成,程序无法执行后面的操作,一直处于等待状态,导致CPU空闲。由于GIL的存在,同一时刻只能有一个线程执行,在程序进行IO操作时,CPU实际并没有做任何工作,程序执行效率非常低。
  2. 为了提高CPU的使用率,Python解释在程序执行IO等待时,会释放GIL锁,让其它线程执行,提高Python程序的执行效率。

所以,GIL对于IO密集型的影响很小,多线程适合用来做IO密集型的程序。

多进程

  1. from multiprocessing import Process
  2. import time
  3. def task():
  4. time.sleep(0.1) # 模拟纯IO操作
  5. if __name__ == '__main__':
  6. start_time = time.time()
  7. p_list = []
  8. for i in range(100):
  9. p = Process(target=task)
  10. p.start()
  11. p_list.append(p)
  12. for p in p_list:
  13. p.join()
  14. print('IO密集型,多进程总耗时:%s' % (time.time() - start_time))
  15. # IO密集型,多进程总耗时:0.26693105697631836

多线程

  1. from threading import Thread
  2. import time
  3. def task():
  4. time.sleep(0.1) # 模拟纯IO操作
  5. start_time = time.time()
  6. t_list = []
  7. for i in range(10):
  8. t = Thread(target=task)
  9. t.start()
  10. t_list.append(t)
  11. for t in t_list:
  12. t.join()
  13. print('IO密集型,多线程总耗时:%s' % (time.time() - start_time))
  14. # IO密集型,多线程总耗时:0.10392904281616211

结论

经过两者对比可以看出,在IO密集型线进程进线程更有优势

死锁现象

  1. 死锁现象指两个或两个以上的进程或线程在执行过程中,因争夺资源而造成的一种互相等待的现象,若无外力作用,它们都将无法推进下去。此时称系统处于死锁状态或系统产生了死锁,这些永远在互相等待的进程称为死锁进程
  2. 在多道程序系统中,由于多个进程的并发执行,改善了系统资源的利用率并提高了系统的处理能力。然而,多个进程的并发执行也带来了新的问题——死锁。所谓死锁是指多个进程因竞争资源而造成的一种僵局,若无外力作用,这些进程都将无法向前推进。

实现死锁现象

  1. from threading import Thread, Lock
  2. import time
  3. mutexA = Lock()
  4. mutexB = Lock()
  5. class MyThread(Thread):
  6. def run(self):
  7. self.f1()
  8. self.f2()
  9. def f1(self):
  10. mutexA.acquire()
  11. print(f'{self.name}抢到了A锁')
  12. mutexB.acquire()
  13. print(f'{self.name}抢到了B锁')
  14. mutexB.release()
  15. mutexA.release()
  16. def f2(self):
  17. mutexB.acquire()
  18. print(f'{self.name}抢到了B锁')
  19. time.sleep(2)
  20. mutexA.acquire()
  21. print(f'{self.name}抢到了A锁')
  22. mutexA.release()
  23. mutexB.release()
  24. for i in range(10):
  25. t = MyThread()
  26. t.start()

原因

  1. 线程1 先开始执行func1,分别拿到AB锁,然后再释放AB锁,线程1继续先执行func2,先拿到了B锁,开始sleep,这时线程2拿到了A锁。这时候就形成了僵局,线程2想要线程1手里的B锁,线程1想要线程2里的A锁。

解决死锁现象

  1. from threading import Thread, RLock
  2. import time
  3. mutexA = RLock()
  4. mutexB = mutexA
  5. class MyThread(Thread):
  6. def run(self):
  7. self.f1()
  8. self.f2()
  9. def f1(self):
  10. mutexA.acquire()
  11. print(f'{self.name}抢到了A锁')
  12. mutexB.acquire()
  13. print(f'{self.name}抢到了B锁')
  14. mutexB.release()
  15. mutexA.release()
  16. def f2(self):
  17. mutexB.acquire()
  18. print(f'{self.name}抢到了B锁')
  19. time.sleep(2)
  20. mutexA.acquire()
  21. print(f'{self.name}抢到了A锁')
  22. mutexA.release()
  23. mutexB.release()
  24. for i in range(10):
  25. t = MyThread()
  26. t.start()

信号量

  1. 在互斥锁中同时只允许一个线程更改数据,而在信号量`Semaphore`是同时(无序轮流)允许一定数量(设置参数)的线程更改数据 ,内置的计数器,每当调用`acquire()`时减1,调用`release()`时加1。计数器不能小于0,当计数器为0时,`acquire()`将阻塞线程至同步锁定状态,直到其他线程调用`release()`
  1. from threading import Thread, Semaphore
  2. import time
  3. import random
  4. sp = Semaphore(5) # 每次最大5个线程运行
  5. def task(num):
  6. sp.acquire() # 抢锁
  7. print("线程%s,抢锁" % num)
  8. time.sleep(random.randint(1, 5))
  9. sp.release() # 放锁
  10. print("线程%s,放锁" % num)
  11. for i in range(20):
  12. t = Thread(target=task, args=(i,))
  13. t.start()

event事件

事件event中有一个全局内置标志Flag,值为 True 或者False。使用wait()函数的线程会处于阻塞状态,此时Flag指为False,直到有其他线程调用set()函数让全局标志Flag置为True,其阻塞的线程立刻恢复运行,还可以用isSet()函数检查当前的Flag状态.

函数 描述
set() 全局内置标志Flag,将标志Flag 设置为 True,通知在等待状态(wait)的线程恢复运行;
isSet() 获取标志Flag当前状态,返回True 或者 False;
wait() 一旦调用,线程将会处于阻塞状态,直到等待其他线程调用set()函数恢复运行;
clear() 将标志设置为False;
  1. from threading import Thread, Event
  2. import time
  3. import random
  4. event = Event() # 创建event事件,造了一个红绿灯
  5. def light():
  6. print('红灯亮,所有人不能动')
  7. time.sleep(3)
  8. print('绿灯亮,油门踩到底,冲!!!')
  9. event.set()
  10. def car(name):
  11. print("选手%s号,正在等红灯" % name)
  12. event.wait()
  13. print("选手%s号加油门,飙车了" % name)
  14. t = Thread(target=light)
  15. t.start()
  16. for i in range(20):
  17. t = Thread(target=car, args=(i,))
  18. t.start()

补充:事件Event主要用于唤醒正在阻塞等待状态的线程

进程池与线程池(重点)

  1. **保证硬件能够正常工作的情况下,最大限度的利用资源。**如何理解这句话呢?比如TCP服务端实现了并发效果,来一个人就开始一个进程或线程来服务这个人,那么如果来一个亿人呢?无论是开设进程还是线程都需要消耗资源,只是开设线程的资源会比进程小一点,但是也经不起一个一个开,瓶颈是计算机硬件资源无法跟上。
  2. **“池”的意思就是在保证计算机能够正常运行的前提下,能最大限度开采的资源的量,虽然降低了程序的运行效率,但是保证了计算机硬件的安全,而让程序可以正常运行**

Python提供了 concurrent.futures 模块,这个模块具有线程池(ThreadPoolExecutor)和进程`池(ProcessPoolExecutor`),用来管理并行编程任务、处理非确定性的执行流程、进程/线程同步等功能

常用方法:

  • submit(fn, *args, **kwargs):将 fn 函数提交给线程池。*args 代表传给fn函数的参数,*kwargs 代表以关键字参数的形式为 fn 函数传入参数。
  • map(func, *iterables, timeout=None, chunksize=1):该函数类似于全局函数 map(func, *iterables),只是该函数将会启动多个线程,以异步方式立即对 iterables执行map处理。
  • shutdown(wait=True):关闭线程池。
  • cancel():取消某个任务
  • done():判断某一个线程是否完成
  • result(timeout=None):取得结果,如果代表的线程任务还未完成,该方法将会阻塞当前线程,其中 timeout 参数指定最多阻塞多少秒。

官方文档:https://docs.python.org/zh-cn/3/library/concurrent.futures.html

进程池

提前创建好固定数量的进程,后续反复使用这些进程

  1. from concurrent.futures import ProcessPoolExecutor
  2. import os, time, random
  3. def task(n):
  4. print('PID: %s is running' % os.getpid())
  5. time.sleep(random.randint(1, 3))
  6. return n ** 2
  7. if __name__ == '__main__':
  8. pool = ProcessPoolExecutor(5) # 线程池线程数默认是CPU个数的五倍 也可以自定义
  9. futures = []
  10. for i in range(11):
  11. t = pool.submit(task, i) # 将可调用对象封装为异步执行
  12. futures.append(t)
  13. pool.shutdown(True) # 等待返回结果才能继续往下执行
  14. for future in futures:
  15. print(future.result())

线程池

提前创建好固定数量的线程,后续反复使用这些线程

  1. from concurrent.futures import ThreadPoolExecutor
  2. import os, time, random
  3. def task(n):
  4. print('PID: %s is running' % os.getpid())
  5. time.sleep(random.randint(1, 3))
  6. return n ** 2
  7. def fn(n):
  8. print(n.result())
  9. if __name__ == '__main__':
  10. pool = ThreadPoolExecutor(5) # 线程池线程数默认是CPU个数的五倍 也可以自定义
  11. for i in range(11):
  12. pool.submit(task, i).add_done_callback(fn) # 一旦某个线程执行完成任务,就会执行执行fn方法,并且fn方法的第一个参数就是future对象

总结

需要掌握的就这三行代码

  1. from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
  2. pool = ThreadPoolExecutor()
  3. pool.submit(task, i).add_done_callback(call_back)