线程
什么是线程
线程(thread)是操作系统能够进行「运算调度」的最小单位,其是进程中的一个执行任务(控制单元),负责当前进程中程序的执行。
一个进程至少有一个线程,一个进程可以运行多个线程,这些线程共享同一块内存,线程之间可以共享对象、资源,如果有冲突或需要协同,还可以随时沟通以解决冲突或保持同步
进程仅仅是在内存中开辟一块空间(提供线程工作所需的资源),线程真正被CPU执行,线程需要的资源跟所在进程的要
为什么要用线程
开设线程的消耗远远小于进程,开一个新的进程需要先申请新的内存空间,而一个进程可以开设多个线程,无需再另外申请空间,并且一个进程内的多个线程数据是共享的
线程与进程区别
- 本质区别:进程是操作系统资源分配的基本单位,而线程是任务调度和执行的基本单位
- 在开销方面:每个进程都有独立的代码和数据空间(程序上下文),程序之间的切换会有较大的开销;线程可以看做轻量级的进程,同一类线程共享代码和数据空间,每个线程都有自己独立的运行栈和程序计数器(PC),线程之间切换的开销小
- 「所处环境」:在操作系统中能同时运行多个进程(程序);而在同一个进程(程序)中有多个线程同时执行(通过CPU调度,在每个时间片中只有一个线程执行)
- 内存分配方面:系统在运行的时候会为每个进程分配不同的内存空间;而对线程而言,除了CPU外,系统不会为线程分配内存(线程所使用的资源来自其所属进程的资源),线程组之间只能共享资源
- 包含关系:没有线程的进程可以看做是单线程的,如果一个进程内有多个线程,则执行过程不是一条线的,而是多条线(线程)共同完成的;线程是进程的一部分,所以线程也被称为轻权进程或者轻量级进程
举个例子:进程=火车,线程=车厢
- 线程在进程下行进(单纯的车厢无法运行)
- 一个进程可以包含多个线程(一辆火车可以有多个车厢)
- 不同进程间数据很难共享(一辆火车上的乘客很难换到另外一辆火车,比如站点换乘)
- 同一进程下不同线程间数据很易共享(A车厢换到B车厢很容易)
- 进程要比线程消耗更多的计算机资源(采用多列火车相比多个车厢更耗资源)
注意:进程是资源分配的最小单位,线程是CPU调度的最小单位,每一个进程中至少有一个线程。
python创建线程
方式一
# 利用函数
from threading import Thread
import time
def task(name):
print(f"{name} is running")
time.sleep(3)
print(f'{name} is over')
# 创建线程无需在__main__下面编写 但是为了统一 还是习惯在子代码中写
t = Thread(target=task, args=('kevin',))
t.start() # 创建线程的开销极小 几乎是一瞬间就可以创建
print('主线程')
# kevin is running
# 主线程
# kevin is over
方式二
# 利用类
from threading import Thread
import time
class MyThread(Thread):
def __init__(self, username):
super().__init__()
self.username = username
def run(self):
print(f'{self.username} is running')
time.sleep(3)
print(f'{self.username} is over')
t = MyThread('kevin')
t.start()d
print('主线程')
# kevin is running
# 主线程
# kevin is over
线程实现TCP并发
服务端
from threading import Thread
import socket
HOST = '127.0.0.1'
PORT = 8080
server = socket.socket()
server.bind((HOST, PORT))
server.listen()
def talk(sock):
while True:
data = sock.recv(1024)
print(data.decode('utf8'))
sock.send(data.upper())
while True:
sock, addr = server.accept()
t = Thread(target=talk, args=(sock,))
t.start()
客户端
import socket
client = socket.socket()
client.connect(('127.0.0.1', 8080))
while True:
client.send(b'hello world')
data = client.recv(1024)
print(data.decode('utf8'))
线程对象属性和方法
join方法
让主线程等待子线程运行完毕再执行
from threading import Thread
import time
def task(name):
print(f'{name} is running')
time.sleep(3)
print(f'{name} is over')
t = Thread(target=task, args=('kevin',))
t.start()
t.join() # 主线程代码等待子线程代码运行完毕之后再往下执行
print('主线程')
# kevin is running
# kevin is over
# 主线程
统计活跃的线程数
active_count()
from threading import Thread,active_count
import os
import time
def task():
time.sleep(3)
print('子线程获取进程号>>>:',os.getpid())
if __name__ == '__main__':
t = Thread(target=task)
t.start()
print(active_count())
print('主线程获取进程号>>>:',os.getpid())
获取线程的名字
current_thread().name
from threading import Thread,active_count,current_thread
import os
import time
def task():
time.sleep(3)
print('子线程获取进程号>>>:',os.getpid())
print(current_thread().name)
if __name__ == '__main__':
t = Thread(target=task)
t.start()
t.join()
print(current_thread().name)
print('主线程获取进程号>>>:',os.getpid())
通过类来获取名字
from threading import Thread,active_count,current_thread
import os
import time
class MyThread(Thread):
def run(self):
print(self.name)
if __name__ == '__main__':
t1 = MyThread()
t2 = MyThread()
t1.start()
t2.start()
# Thread-1
# Thread-2
同一个进程内多个线程数据共享
from threading import Thread
money = 999
def task():
global money
money = 666
t = Thread(target=task)
t.start()
t.join()
print(money)
# 666
守护进程
from threading import Thread
import time
def task(name):
print(f'{name} is running')
time.sleep(3)
print(f'{name} is over')
t = Thread(target=task, args=('kevin',))
t.daemon = True
t.start()
print('主线程')
# kevin is running
# 主线程
计算密集型与IO密集型程序
计算密集型
计算密集型任务的特点是要进行大量的计算,消耗CPU资源,比如计算圆周率、对视频进行高清解码等等,全靠CPU的运算能力。这种计算密集型任务虽然也可以用多任务完成,但是任务越多,花在任务切换的时间就越多,CPU执行任务的效率就越低,所以,要最高效地利用CPU,计算密集型任务同时进行的数量应当等于CPU的核心数。直言比喻 比如造原子弹 需要大量计算这种
IO密集型
大部分的程序在运行时,都需要大量IO操作,比如网络数据的收发,大文件的读写,这样的程序称为IO密集型程序。
IO密集型程序在运行时,需要大量的时间进行等待,如果IO操作不完成,程序无法执行后面的操作,一直处于等待状态,导致CPU空闲。
由于GIL的存在,同一时刻只能有一个线程执行,在程序进行IO操作时,CPU实际并没有做任何工作,程序执行效率非常低。
为了提高CPU的使用率,Python解释在程序执行IO等待时,会释放GIL锁,让其它线程执行,提高Python程序的执行效率。
所以,GIL对于IO密集型的影响很小,多线程适合用来做IO密集型的程序。
GIL全局解释器锁
官方文档
In CPython, the global interpreter lock, or GIL, is a mutex that prevents multiple native threads from executing Python bytecodes at once. This lock is necessary mainly<br />because CPython’s memory management is not thread-safe. (However, since the GIL exists, other features have grown to depend on the guarantees that it enforces.)
理解
GIL只存在于CPython解释器中,不是python的特征,GIL是一把互斥锁,用于阻止同一个进程下的多个线程同时执行。原因是因为CPython解释器中的垃圾回收机制不是线程安全的。
反向验证GIL的存在如果不存在会产生垃圾回收机制与正常线程之间数据错乱,GIL是加在CPython解释器上面的互斥锁,同一个进程下的多个线程要想执行必须先抢GIL锁,所以同一个进程下多个线程肯定不能同时运行,即无法利用多核优势。但是多个任务都是IO密集型的那么多线程优势就体现出来了(消耗的资源更少)
GIL和互斥锁的区别
验证GIL的存在
from threading import Thread, Lock
import time
money = 100
def task():
global money
# time.sleep(0.1)
money -= 1
for i in range(100): # 创建100个线程
t = Thread(target=task)
t.start()
print(money)
# 0
同一个进程下的多个线程虽然有GIL的存在不会出现并行的效果,但是如果线程内有IO操作还是会造成数据的错乱,这个时候需要额外的添加互斥锁
from threading import Thread, Lock
import time
money = 100
mutex = Lock()
def task():
global money
mutex.acquire()
tmp = money
time.sleep(0.1)
money = tmp - 1
mutex.release()
# 抢锁放锁也有简便写法(with上下文管理)
t_list = []
for i in range(100):
t = Thread(target=task)
t.start()
t_list.append(t)
for t in t_list:
t.join()
# 应该等待所有的线程运行完毕再打印money
print(money)
# 100
验证多线程的作用
计算密集型
计算密集型任务的特点是要进行大量的计算,消耗CPU资源,比如计算圆周率、对视频进行高清解码等等,全靠CPU的运算能力。这种计算密集型任务虽然也可以用多任务完成,但是任务越多,花在任务切换的时间就越多,CPU执行任务的效率就越低,所以,要最高效地利用CPU,计算密集型任务同时进行的数量应当等于CPU的核心数。
多进程
from multiprocessing import Process
import os
import time
def task():
res = 1
for i in range(1, 10000):
res *= i
if __name__ == '__main__':
print(os.cpu_count()) # 查看当前计算机CPU个数
start_time = time.time()
p_list = []
for i in range(10):
p = Process(target=task)
p.start()
p_list.append(p)
for p in p_list:
p.join()
print('计算密集型,多进程执行总耗时:%s' % (time.time() - start_time))
# 计算密集型,多进程执行总耗时:0.07307910919189453
多线程
from threading import Thread
import time
def task():
res = 1
for i in range(1, 10000):
res *= i
start_time = time.time()
t_list = []
for i in range(10):
t = Thread(target=task)
t.start()
t_list.append(t)
for t in t_list:
t.join()
print('计算密集型,多线程执行总耗时:%s' % (time.time() - start_time))
# 计算密集型,多线程执行总耗时:0.29865002632141113
结论
经过两者对比可以看出,在计算密集型下多进程比多线程更有优势
IO密集型
大部分的程序在运行时,都需要大量IO操作,比如网络数据的收发,大文件的读写,这样的程序称为IO密集型程序。IO密集型程序在运行时,需要大量的时间进行等待,如果IO操作不完成,程序无法执行后面的操作,一直处于等待状态,导致CPU空闲。由于GIL的存在,同一时刻只能有一个线程执行,在程序进行IO操作时,CPU实际并没有做任何工作,程序执行效率非常低。
为了提高CPU的使用率,Python解释在程序执行IO等待时,会释放GIL锁,让其它线程执行,提高Python程序的执行效率。
所以,GIL对于IO密集型的影响很小,多线程适合用来做IO密集型的程序。
多进程
from multiprocessing import Process
import time
def task():
time.sleep(0.1) # 模拟纯IO操作
if __name__ == '__main__':
start_time = time.time()
p_list = []
for i in range(100):
p = Process(target=task)
p.start()
p_list.append(p)
for p in p_list:
p.join()
print('IO密集型,多进程总耗时:%s' % (time.time() - start_time))
# IO密集型,多进程总耗时:0.26693105697631836
多线程
from threading import Thread
import time
def task():
time.sleep(0.1) # 模拟纯IO操作
start_time = time.time()
t_list = []
for i in range(10):
t = Thread(target=task)
t.start()
t_list.append(t)
for t in t_list:
t.join()
print('IO密集型,多线程总耗时:%s' % (time.time() - start_time))
# IO密集型,多线程总耗时:0.10392904281616211
结论
经过两者对比可以看出,在IO密集型下线进程比进线程更有优势
死锁现象
死锁现象指两个或两个以上的进程或线程在执行过程中,因争夺资源而造成的一种互相等待的现象,若无外力作用,它们都将无法推进下去。此时称系统处于死锁状态或系统产生了死锁,这些永远在互相等待的进程称为死锁进程
在多道程序系统中,由于多个进程的并发执行,改善了系统资源的利用率并提高了系统的处理能力。然而,多个进程的并发执行也带来了新的问题——死锁。所谓死锁是指多个进程因竞争资源而造成的一种僵局,若无外力作用,这些进程都将无法向前推进。
实现死锁现象
from threading import Thread, Lock
import time
mutexA = Lock()
mutexB = Lock()
class MyThread(Thread):
def run(self):
self.f1()
self.f2()
def f1(self):
mutexA.acquire()
print(f'{self.name}抢到了A锁')
mutexB.acquire()
print(f'{self.name}抢到了B锁')
mutexB.release()
mutexA.release()
def f2(self):
mutexB.acquire()
print(f'{self.name}抢到了B锁')
time.sleep(2)
mutexA.acquire()
print(f'{self.name}抢到了A锁')
mutexA.release()
mutexB.release()
for i in range(10):
t = MyThread()
t.start()
原因
线程1 先开始执行func1,分别拿到AB锁,然后再释放AB锁,线程1继续先执行func2,先拿到了B锁,开始sleep,这时线程2拿到了A锁。这时候就形成了僵局,线程2想要线程1手里的B锁,线程1想要线程2里的A锁。
解决死锁现象
from threading import Thread, RLock
import time
mutexA = RLock()
mutexB = mutexA
class MyThread(Thread):
def run(self):
self.f1()
self.f2()
def f1(self):
mutexA.acquire()
print(f'{self.name}抢到了A锁')
mutexB.acquire()
print(f'{self.name}抢到了B锁')
mutexB.release()
mutexA.release()
def f2(self):
mutexB.acquire()
print(f'{self.name}抢到了B锁')
time.sleep(2)
mutexA.acquire()
print(f'{self.name}抢到了A锁')
mutexA.release()
mutexB.release()
for i in range(10):
t = MyThread()
t.start()
信号量
在互斥锁中同时只允许一个线程更改数据,而在信号量`Semaphore`是同时(无序轮流)允许一定数量(设置参数)的线程更改数据 ,内置的计数器,每当调用`acquire()`时减1,调用`release()`时加1。计数器不能小于0,当计数器为0时,`acquire()`将阻塞线程至同步锁定状态,直到其他线程调用`release()`
from threading import Thread, Semaphore
import time
import random
sp = Semaphore(5) # 每次最大5个线程运行
def task(num):
sp.acquire() # 抢锁
print("线程%s,抢锁" % num)
time.sleep(random.randint(1, 5))
sp.release() # 放锁
print("线程%s,放锁" % num)
for i in range(20):
t = Thread(target=task, args=(i,))
t.start()
event事件
事件event
中有一个全局内置标志Flag
,值为 True
或者False
。使用wait()
函数的线程会处于阻塞状态,此时Flag
指为False
,直到有其他线程调用set()
函数让全局标志Flag
置为True
,其阻塞的线程立刻恢复运行,还可以用isSet()
函数检查当前的Flag
状态.
函数 | 描述 |
---|---|
set() | 全局内置标志Flag,将标志Flag 设置为 True,通知在等待状态(wait)的线程恢复运行; |
isSet() | 获取标志Flag当前状态,返回True 或者 False; |
wait() | 一旦调用,线程将会处于阻塞状态,直到等待其他线程调用set()函数恢复运行; |
clear() | 将标志设置为False; |
from threading import Thread, Event
import time
import random
event = Event() # 创建event事件,造了一个红绿灯
def light():
print('红灯亮,所有人不能动')
time.sleep(3)
print('绿灯亮,油门踩到底,冲!!!')
event.set()
def car(name):
print("选手%s号,正在等红灯" % name)
event.wait()
print("选手%s号加油门,飙车了" % name)
t = Thread(target=light)
t.start()
for i in range(20):
t = Thread(target=car, args=(i,))
t.start()
补充:事件Event
主要用于唤醒正在阻塞等待状态的线程
进程池与线程池(重点)
**保证硬件能够正常工作的情况下,最大限度的利用资源。**如何理解这句话呢?比如TCP服务端实现了并发效果,来一个人就开始一个进程或线程来服务这个人,那么如果来一个亿人呢?无论是开设进程还是线程都需要消耗资源,只是开设线程的资源会比进程小一点,但是也经不起一个一个开,瓶颈是计算机硬件资源无法跟上。
**“池”的意思就是在保证计算机能够正常运行的前提下,能最大限度开采的资源的量,虽然降低了程序的运行效率,但是保证了计算机硬件的安全,而让程序可以正常运行**
Python提供了 concurrent.futures
模块,这个模块具有线程池(ThreadPoolExecutor
)和进程`池(
ProcessPoolExecutor`),用来管理并行编程任务、处理非确定性的执行流程、进程/线程同步等功能
常用方法:
submit(fn, *args, **kwargs)
:将fn
函数提交给线程池。*args
代表传给fn
函数的参数,*kwargs
代表以关键字参数的形式为fn
函数传入参数。map(func, *iterables, timeout=None, chunksize=1)
:该函数类似于全局函数map(func, *iterables)
,只是该函数将会启动多个线程,以异步方式立即对iterables
执行map
处理。shutdown(wait=True)
:关闭线程池。cancel()
:取消某个任务done()
:判断某一个线程是否完成result(timeout=None)
:取得结果,如果代表的线程任务还未完成,该方法将会阻塞当前线程,其中 timeout 参数指定最多阻塞多少秒。
官方文档:https://docs.python.org/zh-cn/3/library/concurrent.futures.html
进程池
提前创建好固定数量的进程,后续反复使用这些进程
from concurrent.futures import ProcessPoolExecutor
import os, time, random
def task(n):
print('PID: %s is running' % os.getpid())
time.sleep(random.randint(1, 3))
return n ** 2
if __name__ == '__main__':
pool = ProcessPoolExecutor(5) # 线程池线程数默认是CPU个数的五倍 也可以自定义
futures = []
for i in range(11):
t = pool.submit(task, i) # 将可调用对象封装为异步执行
futures.append(t)
pool.shutdown(True) # 等待返回结果才能继续往下执行
for future in futures:
print(future.result())
线程池
提前创建好固定数量的线程,后续反复使用这些线程
from concurrent.futures import ThreadPoolExecutor
import os, time, random
def task(n):
print('PID: %s is running' % os.getpid())
time.sleep(random.randint(1, 3))
return n ** 2
def fn(n):
print(n.result())
if __name__ == '__main__':
pool = ThreadPoolExecutor(5) # 线程池线程数默认是CPU个数的五倍 也可以自定义
for i in range(11):
pool.submit(task, i).add_done_callback(fn) # 一旦某个线程执行完成任务,就会执行执行fn方法,并且fn方法的第一个参数就是future对象
总结
需要掌握的就这三行代码
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
pool = ThreadPoolExecutor()
pool.submit(task, i).add_done_callback(call_back)