什么是进程(process)
进程是操作系统分配资源的最小单元,进程好比工厂的车间,代表cpu所能处理的单个任务,
任意时刻CPU总是运行一个进程,其他进程处于非运行状态;
进程的内存空间是共享的;
什么是线程(thread)
线程是操作系统调度的最小单元,线程好比车间的工人,一个进程可包括一个或多个线程,多线程协同完成一个任务;
多线程都可使用进程的共享空间;
线程安全相当于车间的厕所,一次只能进入一个人,防止他人同时进入的方法就是加上”🔒”,称为”互斥锁”;
容纳固定数目的线程,可通过给线程加上”信号量”(semaphore),保证多个线程不会相互冲突。
什么是协程(coroutine)
协程又称微线程,是一种用户态的轻量级线程,不是被操作系统内核所管理,而完全是由程序所控制(也就是在用户态执行)。
gevent的基本原理来自libev和libuv,本质上是一种时间驱动模型,如果代码中引入了带io阻塞的代码时,lib本身会自动完成上下文的切换。
- 优点:
- 无需线程上下文切换的开销,线程数量越多,协程的性能优势越明显。
- 无需原子操作锁定及同步的开销,协程不需要多线程的锁机制,因为只有一个线程,不存在同时写变量冲突,在协程中控制共享资源不加锁,只需要判断状态。
- 缺点:
- 无法利用多核资源,协程本质是单线程,不能同时将单CPU的多个核用上,协程需要进程配合使用才能运行在多CPU上。
- 进行阻塞操作会阻塞掉整个程序。
并行和并发
并行是指计算机系统中能同时执行两个或多个任务的计算方法,并行处理可同时工作于同一程序的不同方面。
并发是同一个时间段内有几个程序都在一个CPU中处于运行状态,但任一时刻只有一个程序在CPU上运行。
并发在于有处理多个任务的能力,不一定要同时;并行在于就是同时处理多个任务的能力,并行是并发的子集。
阻塞/非阻塞、同步/异步
在进程通信中,阻塞/非阻塞,同步/异步是同义词,但是需要区分对象是发送方还是接收方。
发送方阻塞/非阻塞(同步/异步)和接收方的阻塞/非阻塞(同步/异步)是互不影响的。
在IO系统调用层面,非阻塞IO系统调用和异步IO系统调用存在一定差别,他们都不阻塞进程,但返回结果方式和内容是有所差别的,可能是完整的结果、也可以是不完整的,也可以是空值,但都属于非阻塞系统调用。
同步与异步关注的是消息通信机制:
1. 你打电话问书店老板有没有xxxx这本书,如果是同步通信机制,书店老板会说,你稍等,”我查一下”,然后开始查,等查好了(可能是5秒,也可能是一天)告诉你结果(返回结果)。
2. 异步通信机制,书店老板直接告诉你我查一下啊,查好了打电话给你,然后直接挂电话了(不返回结果)。然后查好了,他会主动打电话给你。在这里老板通过“回电”这种方式来回调 阻塞和非阻塞关注的是程序在等待调用结果(消息,返回值)时的状态: 阻塞调用是指调用结果返回之前,当前线程会被挂起,调用线程只有在得到结果之后才会返回。
非阻塞调用指在不能立刻得到结果之前,该调用不会阻塞当前线程。
- 你打电话问书店老板有没有xxx这本书,你如果是阻塞式调用,你会一直把自己“挂起”,直到得到这本书有没有的结果,如果是非阻塞式调用,你不管老板有没有告诉你,你自己先一边去玩了, 当然你也要偶尔过几分钟check一下老板有没有返回结果。
在这里阻塞与非阻塞与是否同步异步无关。跟老板通过什么方式回答你结果无关
mulprocessing实现多进程
常用函数说明
Process():通过Process方法创建进程,接收两个参数,target:一般指向函数名,args:需要向函数传递的参数
- start():开始进程
- join():阻塞主进程,等待子进程完成
- os.getpid():打印当前进程名称
Pool类创建多进程
- apply_async(func[, args=()[, kwds={}[[, callback=None]]]):向进程池提交需要执行的函数及参数,非阻塞调用。
- map(func,iterable):使进程阻塞直到返回结果。
- map_async():同map函数,非阻塞调用。
- close():关闭进程池(pool),使其不再接受新的任务。
- terminate():结束工作进程,不再处理未处理的任务。
- join():主进程阻塞等待子进程的退出,在close()和terminate()之后使用 ```python import time import os from multiprocessing import Process, cpu_count, Pool
def long_time_task2(i): print(“子进程:{} - 任务: {}”.format(os.getpid(), i)) with open(‘process_text.txt’, ‘w’) as f: for j in range(100000): f.write(str(j) + ‘\n’)
if name == “main“: print(“当前cpu核数:{}”.format(cpu_count())) print(“当前母进程:{}”.format(os.getpid())) start = time.time()
# 使用Pool
p = Pool(cpu_count())
for i in range(cpu_count()):
p.apply_async(long_time_task2, args=(i, ))
print("等待所有pool子进程完成")
p.close()
p.join()
end = time.time()
print("耗时:{}秒".format(end-start))
<a name="V3X1f"></a>
####
<a name="n0Uhk"></a>
#### 多进程之间的数据共享
通过使用队列queue来实现不同进程间的通信或数据共享
```python
from multiprocessing import Process, Queue
import os, time, random
def write(q):
print("Process to write: {}".format(os.getpid()))
for value in ["A", "B", "C"]:
print("Put {} to queue...".format(value))
q.put(value)
time.sleep(random.random())
def read(q):
print("Process to read: {}".format(os.getpid()))
while True:
value = q.get(True)
print("Get {} from queue.".format(value))
if __name__ == "__main__":
q = Queue()
pw = Process(target=write, args=(q,))
pr = Process(target=read, args=(q,))
pw.start()
pr.start()
pw.join()
pr.terminate()
threading实现多线程
- threading.currentThread():返回当前的线程变量
- threading.enumetate():返回正在运行的线程的list
- threading.activeCount():返回正在运行的线程数量
- Thread类处理线程
run(): 用以表示线程活动的方法 start(): 启动线程活动 join(): 等待至线程终止 sAlive(): 返回线程是否活动 getName(): 返回线程名 setName(): 设置线程名 setDaemon: 将该线程声明为守护线程setDaemon(True),子线程会随着父线程的终止而终止;否则,子线程仍在运行中,主线程不会退出(但会向后执行),直到所有子线程运行结束。setDaemon()需要在start()前声明。
import threading
import time
def long_time_task(i):
print("当前子进程:{}- 任务 {}".format(threading.current_thread().name, i))
# time.sleep(2)
# print("结果: {}".format(8 ** 20))
with open('thread_text.txt', 'w') as f:
for j in range(100000):
f.write(str(j) + '\n')
if __name__ == "__main__":
start = time.time()
print("这是主线程:{}".format(threading.current_thread().name))
thread_list = []
for k in range(1, 10):
t = threading.Thread(target=long_time_task, args=(k, ))
thread_list.append(t)
for i in thread_list:
i.setDaemon(True)
i.start()
for i in thread_list:
i.join()
end = time.time()
print("总共用时{}秒".format(end-start))
多线程之间的数据共享
- threading.lock()实现对线程中共享变量的锁定,确保每次只有一个线程能修改
- threading.lock().acquire():获得锁
- threading.lock().release():释放锁 ```python import threading from logzero import logger
class Account:
def __init__(self):
self.num = 1
def add(self, lock):
with lock:
for i in range(100000):
self.num += 1
logger.info(f"add self.num: {self.num}")
def delete(self, lock):
with lock:
for i in range(100000):
self.num -= 1
logger.info(f"delete self.num: {self.num}")
if name == “main“: account = Account() lock = threading.Lock()
# 创建线程
thread_num = 10
thread_add_list = []
thread_delete_list = []
for i in range(thread_num):
thread_add = threading.Thread(target=account.add, args=(lock, ), name="add")
thread_add_list.append(thread_add)
thread_delete = threading.Thread(target=account.delete, args=(lock,), name="Delete")
thread_delete_list.append(thread_delete)
# 启动线程
for i in thread_add_list:
i.start()
i.join()
for i in thread_delete_list:
i.start()
i.join()
logger.info(f"num is {account.num}")
“””
[I 220112 13:15:08 threading_test_2:24] add self.num: 100001 [I 220112 13:15:08 threading_test_2:24] add self.num: 200001 [I 220112 13:15:08 threading_test_2:24] add self.num: 300001 [I 220112 13:15:08 threading_test_2:24] add self.num: 400001 [I 220112 13:15:08 threading_test_2:24] add self.num: 500001 [I 220112 13:15:08 threading_test_2:24] add self.num: 600001 [I 220112 13:15:08 threading_test_2:24] add self.num: 700001 [I 220112 13:15:08 threading_test_2:24] add self.num: 800001 [I 220112 13:15:08 threading_test_2:24] add self.num: 900001 [I 220112 13:15:08 threading_test_2:24] add self.num: 1000001 [I 220112 13:15:08 threading_test_2:30] delete self.num: 900001 [I 220112 13:15:08 threading_test_2:30] delete self.num: 800001 [I 220112 13:15:08 threading_test_2:30] delete self.num: 700001 [I 220112 13:15:08 threading_test_2:30] delete self.num: 600001 [I 220112 13:15:08 threading_test_2:30] delete self.num: 500001 [I 220112 13:15:08 threading_test_2:30] delete self.num: 400001 [I 220112 13:15:08 threading_test_2:30] delete self.num: 300001 [I 220112 13:15:08 threading_test_2:30] delete self.num: 200001 [I 220112 13:15:08 threading_test_2:30] delete self.num: 100001 [I 220112 13:15:08 threading_test_2:30] delete self.num: 1 [I 220112 13:15:08 threading_test_2:55] num is 1 “””
<a name="IFvCH"></a>
####
<a name="nTwld"></a>
#### 队列实现线程安全的数据共享
```python
from queue import Queue
import time
import threading
import random
# 生产者
class Productor(threading.Thread):
def __init__(self, name, queue):
# threading.Thread.__init__(self, name=name)
super(Productor, self).__init__(name=name)
self.queue = queue
def run(self):
for i in range(1, 5):
print("{} is producting {} to the queue".format(self.getName(), i))
self.queue.put(i)
time.sleep(random.randrange(10)/5)
print("{} finished!".format(self.getName()))
# 消费者
class Consumer(threading.Thread):
def __init__(self, name, queue):
# threading.Thread.__init__(self, name=name)
super(Consumer, self).__init__(name=name)
self.queue = queue
def run(self):
for i in range(1, 5):
val = self.queue.get()
print("{} is consuming {} in the queue".format(self.getName(), val))
time.sleep(random.randrange(10))
print("{} finished!".format(self.getName()))
def main():
start = time.time()
queue = Queue()
producer = Productor('Producer', queue)
consumer = Consumer('Consumer', queue)
producer.start()
consumer.start()
producer.join()
consumer.join()
end = time.time()
print("all threads finished!, 耗时 {} 秒".format(end-start))
if __name__ == "__main__":
main()
-----------------------------------------------------------------------------
"""
# @File : threading_test-1.py
# @Time : 2021/01/11 23:19:59
# @Author : wangshunzhe
@desc: 多线程去消费一个队列的例子
"""
import threading
import time
import queue
# 下面来通过多线程来处理Queue里面的任务:
def work(q):
while True:
if q.empty():
return
else:
t = q.get()
print("当前线程sleep {} 秒".format(t))
time.sleep(t)
def main():
q = queue.Queue()
for i in range(10):
q.put(i) # 往队列里生成消息
# 单线程
# work(q)
# 多线程
thread_num = 10
threads = []
for i in range(thread_num):
t = threading.Thread(target=work, args=(q,))
# args需要输出的是一个元组,如果只有一个参数,后面加,表示元组,否则会报错
threads.append(t)
for i in range(thread_num):
threads[i].setDaemon(True)
threads[i].start()
for i in range(thread_num):
threads[i].join()
if __name__ == "__main__":
start = time.time()
main()
print('耗时:', time.time() - start)
"""
当前线程sleep 0 秒
当前线程sleep 1 秒
当前线程sleep 3 秒
当前线程sleep 4 秒
当前线程sleep 5 秒
当前线程sleep 6 秒
当前线程sleep 8 秒
当前线程sleep 2 秒
当前线程sleep 7 秒
当前线程sleep 9 秒
耗时: 0.0029397010803222656
"""
为啥队列是线程安全的?
因为队列Queue的put方法和get方法都是原子操作
tips:
对CPU密集型代码(循环操作)—多进程效率高
对IO密集型代码(文件操作、爬虫)—多线程效率高
原子操作
指不会被线程调度机制打断的操作,这种操作一旦开始,就一直运行到结束,中间不会切换到其他线程。
常见的原子操作:
L.append(i)
L1.extend(L2)
x = L[i]
x = L.pop()
L[i:j] = L2
L.sort()
x = y
x.field = y
D[x] = y
D1.update(D2)
D.keys()
gevent实现协程
from __future__ import print_function
import gevent
from gevent import monkey, sleep
monkey.patch_all()
import time
import requests
urls = [
'https://www.baidu.com/',
'https://kitten4.codemao.cn/'
]
def print_head(url):
start_time = time.time()
print('Starting %s' % url)
data = requests.get(url).text
print('%s: %s bytes: %r' % (url, len(data), data[:50]))
total_time = time.time() - start_time
print('total time is %s' % total_time)
# 生成Greenlet对象
jobs = [gevent.spawn(print_head, _url) for _url in urls]
# 使用joinall对Greenlet对象进行管控,等待所有greenlet对象以达到协程自动切换的目的
# 返回所有已经join的greenlet对象列表
gevent.joinall(jobs)
"""
Starting https://www.baidu.com/
Starting https://kitten4.codemao.cn/
https://www.baidu.com/: 2443 bytes: '<!DOCTYPE html>\r\n<!--STATUS OK--><html> <head><met'
total time is 0.12381315231323242
https://kitten4.codemao.cn/: 15049 bytes: '<!doctype html><html><head><script>"use strict";\n\n'
total time is 0.508491039276123
"""
gevent下的monkey机制
自动替换原来的thread、socket、time、multiprocessing等代码,全部变成gevent框架
from gevent import monkey, sleep
monkey.patch_all()
import socket
def print_monkey():
print('obj', socket.socket)
jobs = [gevent.spawn(print_monkey)]
# 使用joinall对Greenlet对象进行管控,等待所有greenlet对象以达到协程自动切换的目的
# 返回所有已经join的greenlet对象列表
gevent.joinall(jobs)
"""
obj <class 'gevent._socket3.socket'>
"""
# 没有打补丁的情况下
import socket
def print_monkey():
print('obj', socket.socket)
jobs = [gevent.spawn(print_monkey)]
# 使用joinall对Greenlet对象进行管控,等待所有greenlet对象以达到协程自动切换的目的
# 返回所有已经join的greenlet对象列表
gevent.joinall(jobs)
"""
obj <class 'socket.socket'>
"""
gevent延时操作
from gevent import monkey, sleep
monkey.patch_all()
import gevent
def f1():
for i in range(6, 11):
print('this is ' + str(i))
# 会主动调用切换函数
gevent.sleep(2)
def f2():
for i in range(5):
print('that is ' + str(i))
t1 = gevent.spawn(f1)
t2 = gevent.spawn(f2)
gevent.wait([t1, t2])
"""
this is 6
that is 0
that is 1
that is 2
that is 3
that is 4
this is 7
this is 8
this is 9
this is 10
"""
gevent常用方法
gevent.spawn() | 创建一个普通的Greenlet对象并切换 |
---|---|
gevent.spawn_later(seconds=1) | 延时创建一个普通的Greenlet对象并切换 |
gevent.getcurrent() | 返回当前正在执行的greenlet |
gevent.joinall(jobs) | 将协程任务添加到事件循环,接收一个任务列表 |
gevent.wait() | 替代join函数等待循环结束,,也可以传入协程对象列表 |
gevent.kill() | 杀死一个协程 |
gevent.killall() | 杀死一个协程列表里的所有协程 |
monkey.patch_all() | 自动将python的一些标准模块替换成gevent框架 |
gevent中的组和池
- 组(group)是一个运行中greenlet集合,集合中的greenlet像一个组一样会被共同管理和调度
- Group().add():将greenlet添加到group中
- Group().map(func, iterable):由第二个参数控制迭代次数,返回可迭代对象执行结果列表
- Group().imap():返回一个可迭代对象 ```python def talk(msg): for i in range(2): print(msg)
g1 = gevent.spawn(talk, ‘bar’) g2 = gevent.spawn(talk, ‘foo’) print(g1) group = Group()
add方法,将greenlet添加到group中
group.add(g1) group.add(g2)
等待spawn完成,完成即从group里面去掉
group.join()
“””
def talk_map(data): print(‘Size of group %s’ % len(group1)) print(‘Hello from Greenlet %s’ % id(getcurrent())) return data
group1 = Group()
返回可迭代对象执行结果列表
res = group1.map(talk_map, [1, 2, 3]) print(type(res)) print(res)
“”” Size of group 3 Hello from Greenlet 140581403301936 Size of group 3 Hello from Greenlet 140581403302224 Size of group 3 Hello from Greenlet 140581403302512
def talk_imap(data): print(‘Size of group %s’ % len(group2)) print(‘Hello from Greenlet %s’ % id(getcurrent())) return data
group2 = Group()
返回可迭代对象执行结果列表
res2 = group2.imap(talk_imap, range(5), maxsize=1) print(type(res2)) print(res2) for i in res2: print(i)
“””
�
2. 池(Pool)是一种用于处理需要限制并发性的动态数量
1. Pool().map(func, iterable):由第二个参数控制迭代次数,返回可迭代对象执行结果列表
2. Pool().imap():返回一个可迭代对象
```python
pool = Pool(2)
def hello_from(n):
print('Size of pool %s' % len(pool))
return 44
# pool.map(hello_from, range(4))
for i in pool.imap(hello_from, range(3)):
print(i)
"""
Size of pool 2
Size of pool 2
44
44
Size of pool 1
44
"""
�
gevent结合数据结构
队列queue是一个排序的数据集合,常用的方法如下:
put_nowait(item) | 非阻塞的往队列放入数据,队列满则抛出full exception |
---|---|
get_nowait() | 非阻塞的从队列读取数据,队列为空则抛出empty exception |
put(block=True, timeout=None) | 从队列放入数据,可选是否阻塞和超时时间 |
get(block=True, timeout=None) | 从队列读取数据,可选是否阻塞和超时时间 |
peek(block=True, timeout=None) | 与get()类似,但获取的数据不会从队列移除 |
peek_nowait() | 类似get_nowait() |
empty() | 队列为空返回True |
full() | 队列已满返回True |
qsize() | 返回队列长度 |
from gevent import monkey
from gevent.queue import Queue
monkey.patch_all()
import gevent
tasks = Queue()
def worker(n):
while not tasks.empty():
task = tasks.get()
print("Worker {} got task {}".format(n, task))
gevent.sleep(0)
print("没有数据了")
def producer():
for i in range(3):
tasks.put(i)
print("produce: {}".format(i))
gevent.spawn(producer).join()
gevent.joinall([
gevent.spawn(worker('worker A')),
gevent.spawn(worker('worker B')),
gevent.spawn(worker('worker C')),
])
"""
produce: 0
produce: 1
produce: 2
Worker worker A got task 0
Worker worker A got task 1
Worker worker A got task 2
没有数据了
没有数据了
没有数据了
"""
python中的锁
GIL锁
全局解释器锁,每个线程在执行的时候都需要先获取GIL,保证同一个时刻只有一个线程可以执行代码,即说同一进程内的多个线程同一时间只能有一个运行
- 释放GIL锁的时机:
- 遇到IO操作,会造成CPU闲置,会释放GIL
- 会有专门的triks计数,到达一定值会释放GIL锁,这时候线程之间会开始竞争GIL锁
- GIL锁和互斥锁的关系:
- 假设一个进程中有多线程运行,线程A获得GIL—>获得互斥锁lock,线程A在开始修改数据之前,遇到IO操作(数据读入内存或者内存输出的过程)
- 线程A释放GIL,线程A、B开始竞争GIL锁
- 假设线程B获得GIL锁—>因为有互斥锁lock,B无法修改数据,释放GIL锁
- 假设线程A再次竞争到GIL锁,因为其占有互斥锁,开始修改数据—>释放互斥锁
- 这时候线程B竞争到GIL和互斥锁后才能修改数据
- 综上,cpython中,多线程比单线程性能快的原因是:遇到IO阻塞时,正在执行的线程会释放GIL锁,其他线程会利用这个空闲时间执行自己的代码
- 关于计算密集型,使用多进程效率高,关于IO密集型,使用多线程效率高
# 进程应用场景
# 计算密集型:多进程效率高
from multiprocessing import Process
from threading import Thread
import os,time
def work():
res=0
for i in range(10000000):
res*=i
if __name__ == '__main__':
l=[]
print(os.cpu_count()) #本机为4核
start=time.time()
for i in range(4):
p=Process(target=work) #耗时run time is 0.8030457496643066
# p=Thread(target=work) #耗时run time is 2.134121894836426
l.append(p)
p.start()
for p in l:
p.join()
stop=time.time() #
print('run time is %s' %(stop-start))
同步锁
import threading
import time
"""同步锁"""
lock = threading.Lock()
"""没有加锁的情况"""
def func():
global num
num1 = num
time.sleep(0.1) # 此处线程被释放,num值还没有修改
num = num1 - 1
num = 100
l = []
for i in range(100):
t = threading.Thread(target=func, args=())
t.start()
l.append(t)
for i in l:
i.join()
print(num) # 99
"""加锁的情况"""
def func_1():
global num3
with lock:
num2 = num3
time.sleep(0.1)
num3 = num2 - 1
num3 = 100
l2 = []
for i in range(100):
t = threading.Thread(target=func_1, args=())
t.start()
l2.append(t)
for i in l2:
i.join()
print(num3) # 0
递归锁(重入锁)
为了支持同一个线程中多次请求同一资源,Python 提供了可重入锁(RLock)。这个RLock内部维护着一个锁(Lock)和一个计数器(counter)变量,counter 记录了acquire 的次数,从而使得资源可以被多次acquire。直到一个线程所有 acquire都被release(计数器counter变为0),其他的线程才能获得资源。
"""递归锁"""
import time
import threading
r_lock = threading.RLock()
class MyThread(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
def run(self):
self.fun_A()
self.fun_B()
def fun_A(self):
r_lock.acquire()
print("A加锁1", end='\t')
r_lock.acquire()
print('A加锁2', end='\t')
time.sleep(0.2)
r_lock.release()
print('A释放1', end='\t')
r_lock.release()
print('A释放2')
def fun_B(self):
r_lock.acquire()
print("B加锁1", end='\t')
r_lock.acquire()
print('B加锁2', end='\t')
time.sleep(3)
r_lock.release()
print('B释放1', end='\t')
r_lock.release()
print('B释放2')
if __name__ == '__main__':
t1 = MyThread()
t2 = MyThread()
t1.start()
t2.start()
"""
A加锁1 A加锁2 A释放1 A释放2
A加锁1 A加锁2 A释放1 A释放2
B加锁1 B加锁2 B释放1 B释放2
B加锁1 B加锁2 B释放1 B释放2
"""
"""注意观察程序的运行,当运行到程序B时,即使B休眠了3秒也不会切换线程"""
信号量
信号量是一个内部数据,它有一个内置的计数器,它标明当前的共享资源可以有多少线程同时读取。
信号量控制规则:当计数器大于0时,那么可以为线程分配资源权限;当计数器小于0时,未获得权限的线程会被挂起,直到其他线程释放资源。
"""信号量"""
import random
import threading
import time
# 创建信号量对象,信号量设置为3,需要有三个线程才启动
semaphore = threading.Semaphore(3)
def func():
# 获取信号 -1
if semaphore.acquire():
print(threading.currentThread().getName() + '获得信号量')
time.sleep(random.randint(1, 5))
# 释放信号 +1
semaphore.release()
for i in range(10):
t1 = threading.Thread(target=func)
t1.start()
"""注意观察程序运行,开始只有3个线程获得了资源的权限,
后面当释放几个资源时就有几个获得资源权限"""
信号量被初始化为0,目的是同步两个或多个线程。线程必须并行运行,所以需要信号量同步。
"""信号量"""
import random
import threading
import time
# 同步两个不同线程,信号量被初始化为0
semaphore = threading.Semaphore(0)
def consumer():
print("----等待producer运行----")
semaphore.acquire() # 获取资源,信号量为0被挂起,等待信号量释放
print("----consumer 结束--- 编号:%s" % item)
def producer():
global item
time.sleep(3)
item = random.randint(0, 100)
print("producer运行编号:%s" % item)
semaphore.release()
if __name__ == "__main__":
for i in range(0, 4):
t1 = threading.Thread(target=producer)
t2 = threading.Thread(target=consumer)
t1.start()
t2.start()
t1.join()
t2.join()
print('程序终止')
"""
信号量被初始化为0,目的是同步两个或多个线程。
线程必须并行运行,所以需要信号量同步。
这种运用场景有时会用到,比较难理解,多运行示例仔细观察打印结果
"""
参考:https://zhuanlan.zhihu.com/p/37620890
参考:https://zhuanlan.zhihu.com/p/46368084