什么是进程(process)
进程是操作系统分配资源的最小单元,进程好比工厂的车间,代表cpu所能处理的单个任务,
任意时刻CPU总是运行一个进程,其他进程处于非运行状态;
进程的内存空间是共享的;
什么是线程(thread)
线程是操作系统调度的最小单元,线程好比车间的工人,一个进程可包括一个或多个线程,多线程协同完成一个任务;
多线程都可使用进程的共享空间;
线程安全相当于车间的厕所,一次只能进入一个人,防止他人同时进入的方法就是加上”🔒”,称为”互斥锁”;
容纳固定数目的线程,可通过给线程加上”信号量”(semaphore),保证多个线程不会相互冲突。
什么是协程(coroutine)
协程又称微线程,是一种用户态的轻量级线程,不是被操作系统内核所管理,而完全是由程序所控制(也就是在用户态执行)。
gevent的基本原理来自libev和libuv,本质上是一种时间驱动模型,如果代码中引入了带io阻塞的代码时,lib本身会自动完成上下文的切换。
- 优点:
- 无需线程上下文切换的开销,线程数量越多,协程的性能优势越明显。
 - 无需原子操作锁定及同步的开销,协程不需要多线程的锁机制,因为只有一个线程,不存在同时写变量冲突,在协程中控制共享资源不加锁,只需要判断状态。
 
 - 缺点:
- 无法利用多核资源,协程本质是单线程,不能同时将单CPU的多个核用上,协程需要进程配合使用才能运行在多CPU上。
 - 进行阻塞操作会阻塞掉整个程序。
 
 
并行和并发
并行是指计算机系统中能同时执行两个或多个任务的计算方法,并行处理可同时工作于同一程序的不同方面。
并发是同一个时间段内有几个程序都在一个CPU中处于运行状态,但任一时刻只有一个程序在CPU上运行。
并发在于有处理多个任务的能力,不一定要同时;并行在于就是同时处理多个任务的能力,并行是并发的子集。
阻塞/非阻塞、同步/异步
在进程通信中,阻塞/非阻塞,同步/异步是同义词,但是需要区分对象是发送方还是接收方。
发送方阻塞/非阻塞(同步/异步)和接收方的阻塞/非阻塞(同步/异步)是互不影响的。
在IO系统调用层面,非阻塞IO系统调用和异步IO系统调用存在一定差别,他们都不阻塞进程,但返回结果方式和内容是有所差别的,可能是完整的结果、也可以是不完整的,也可以是空值,但都属于非阻塞系统调用。
同步与异步关注的是消息通信机制:
1. 你打电话问书店老板有没有xxxx这本书,如果是同步通信机制,书店老板会说,你稍等,”我查一下”,然后开始查,等查好了(可能是5秒,也可能是一天)告诉你结果(返回结果)。
2. 异步通信机制,书店老板直接告诉你我查一下啊,查好了打电话给你,然后直接挂电话了(不返回结果)。然后查好了,他会主动打电话给你。在这里老板通过“回电”这种方式来回调 阻塞和非阻塞关注的是程序在等待调用结果(消息,返回值)时的状态: 阻塞调用是指调用结果返回之前,当前线程会被挂起,调用线程只有在得到结果之后才会返回。
非阻塞调用指在不能立刻得到结果之前,该调用不会阻塞当前线程。
- 你打电话问书店老板有没有xxx这本书,你如果是阻塞式调用,你会一直把自己“挂起”,直到得到这本书有没有的结果,如果是非阻塞式调用,你不管老板有没有告诉你,你自己先一边去玩了, 当然你也要偶尔过几分钟check一下老板有没有返回结果。
 
在这里阻塞与非阻塞与是否同步异步无关。跟老板通过什么方式回答你结果无关
mulprocessing实现多进程
常用函数说明
Process():通过Process方法创建进程,接收两个参数,target:一般指向函数名,args:需要向函数传递的参数
- start():开始进程
 - join():阻塞主进程,等待子进程完成
 - os.getpid():打印当前进程名称
 
Pool类创建多进程
- apply_async(func[, args=()[, kwds={}[[, callback=None]]]):向进程池提交需要执行的函数及参数,非阻塞调用。
 - map(func,iterable):使进程阻塞直到返回结果。
 - map_async():同map函数,非阻塞调用。
 - close():关闭进程池(pool),使其不再接受新的任务。
 - terminate():结束工作进程,不再处理未处理的任务。
 - join():主进程阻塞等待子进程的退出,在close()和terminate()之后使用 ```python import time import os from multiprocessing import Process, cpu_count, Pool
 
def long_time_task2(i): print(“子进程:{} - 任务: {}”.format(os.getpid(), i)) with open(‘process_text.txt’, ‘w’) as f: for j in range(100000): f.write(str(j) + ‘\n’)
if name == “main“: print(“当前cpu核数:{}”.format(cpu_count())) print(“当前母进程:{}”.format(os.getpid())) start = time.time()
# 使用Poolp = Pool(cpu_count())for i in range(cpu_count()):p.apply_async(long_time_task2, args=(i, ))print("等待所有pool子进程完成")p.close()p.join()end = time.time()print("耗时:{}秒".format(end-start))
<a name="V3X1f"></a>####<a name="n0Uhk"></a>#### 多进程之间的数据共享通过使用队列queue来实现不同进程间的通信或数据共享```pythonfrom multiprocessing import Process, Queueimport os, time, randomdef write(q):print("Process to write: {}".format(os.getpid()))for value in ["A", "B", "C"]:print("Put {} to queue...".format(value))q.put(value)time.sleep(random.random())def read(q):print("Process to read: {}".format(os.getpid()))while True:value = q.get(True)print("Get {} from queue.".format(value))if __name__ == "__main__":q = Queue()pw = Process(target=write, args=(q,))pr = Process(target=read, args=(q,))pw.start()pr.start()pw.join()pr.terminate()
threading实现多线程
- threading.currentThread():返回当前的线程变量
 - threading.enumetate():返回正在运行的线程的list
 - threading.activeCount():返回正在运行的线程数量
 - Thread类处理线程
run(): 用以表示线程活动的方法 start(): 启动线程活动 join(): 等待至线程终止 sAlive(): 返回线程是否活动 getName(): 返回线程名 setName(): 设置线程名 setDaemon: 将该线程声明为守护线程setDaemon(True),子线程会随着父线程的终止而终止;否则,子线程仍在运行中,主线程不会退出(但会向后执行),直到所有子线程运行结束。setDaemon()需要在start()前声明。
 
import threadingimport timedef long_time_task(i):print("当前子进程:{}- 任务 {}".format(threading.current_thread().name, i))# time.sleep(2)# print("结果: {}".format(8 ** 20))with open('thread_text.txt', 'w') as f:for j in range(100000):f.write(str(j) + '\n')if __name__ == "__main__":start = time.time()print("这是主线程:{}".format(threading.current_thread().name))thread_list = []for k in range(1, 10):t = threading.Thread(target=long_time_task, args=(k, ))thread_list.append(t)for i in thread_list:i.setDaemon(True)i.start()for i in thread_list:i.join()end = time.time()print("总共用时{}秒".format(end-start))
多线程之间的数据共享
- threading.lock()实现对线程中共享变量的锁定,确保每次只有一个线程能修改
 - threading.lock().acquire():获得锁
 - threading.lock().release():释放锁 ```python import threading from logzero import logger
 
class Account:
def __init__(self):self.num = 1def add(self, lock):with lock:for i in range(100000):self.num += 1logger.info(f"add self.num: {self.num}")def delete(self, lock):with lock:for i in range(100000):self.num -= 1logger.info(f"delete self.num: {self.num}")
if name == “main“: account = Account() lock = threading.Lock()
# 创建线程thread_num = 10thread_add_list = []thread_delete_list = []for i in range(thread_num):thread_add = threading.Thread(target=account.add, args=(lock, ), name="add")thread_add_list.append(thread_add)thread_delete = threading.Thread(target=account.delete, args=(lock,), name="Delete")thread_delete_list.append(thread_delete)# 启动线程for i in thread_add_list:i.start()i.join()for i in thread_delete_list:i.start()i.join()logger.info(f"num is {account.num}")
“””
[I 220112 13:15:08 threading_test_2:24] add self.num: 100001 [I 220112 13:15:08 threading_test_2:24] add self.num: 200001 [I 220112 13:15:08 threading_test_2:24] add self.num: 300001 [I 220112 13:15:08 threading_test_2:24] add self.num: 400001 [I 220112 13:15:08 threading_test_2:24] add self.num: 500001 [I 220112 13:15:08 threading_test_2:24] add self.num: 600001 [I 220112 13:15:08 threading_test_2:24] add self.num: 700001 [I 220112 13:15:08 threading_test_2:24] add self.num: 800001 [I 220112 13:15:08 threading_test_2:24] add self.num: 900001 [I 220112 13:15:08 threading_test_2:24] add self.num: 1000001 [I 220112 13:15:08 threading_test_2:30] delete self.num: 900001 [I 220112 13:15:08 threading_test_2:30] delete self.num: 800001 [I 220112 13:15:08 threading_test_2:30] delete self.num: 700001 [I 220112 13:15:08 threading_test_2:30] delete self.num: 600001 [I 220112 13:15:08 threading_test_2:30] delete self.num: 500001 [I 220112 13:15:08 threading_test_2:30] delete self.num: 400001 [I 220112 13:15:08 threading_test_2:30] delete self.num: 300001 [I 220112 13:15:08 threading_test_2:30] delete self.num: 200001 [I 220112 13:15:08 threading_test_2:30] delete self.num: 100001 [I 220112 13:15:08 threading_test_2:30] delete self.num: 1 [I 220112 13:15:08 threading_test_2:55] num is 1 “””
<a name="IFvCH"></a>####<a name="nTwld"></a>#### 队列实现线程安全的数据共享```pythonfrom queue import Queueimport timeimport threadingimport random# 生产者class Productor(threading.Thread):def __init__(self, name, queue):# threading.Thread.__init__(self, name=name)super(Productor, self).__init__(name=name)self.queue = queuedef run(self):for i in range(1, 5):print("{} is producting {} to the queue".format(self.getName(), i))self.queue.put(i)time.sleep(random.randrange(10)/5)print("{} finished!".format(self.getName()))# 消费者class Consumer(threading.Thread):def __init__(self, name, queue):# threading.Thread.__init__(self, name=name)super(Consumer, self).__init__(name=name)self.queue = queuedef run(self):for i in range(1, 5):val = self.queue.get()print("{} is consuming {} in the queue".format(self.getName(), val))time.sleep(random.randrange(10))print("{} finished!".format(self.getName()))def main():start = time.time()queue = Queue()producer = Productor('Producer', queue)consumer = Consumer('Consumer', queue)producer.start()consumer.start()producer.join()consumer.join()end = time.time()print("all threads finished!, 耗时 {} 秒".format(end-start))if __name__ == "__main__":main()-----------------------------------------------------------------------------"""# @File : threading_test-1.py# @Time : 2021/01/11 23:19:59# @Author : wangshunzhe@desc: 多线程去消费一个队列的例子"""import threadingimport timeimport queue# 下面来通过多线程来处理Queue里面的任务:def work(q):while True:if q.empty():returnelse:t = q.get()print("当前线程sleep {} 秒".format(t))time.sleep(t)def main():q = queue.Queue()for i in range(10):q.put(i) # 往队列里生成消息# 单线程# work(q)# 多线程thread_num = 10threads = []for i in range(thread_num):t = threading.Thread(target=work, args=(q,))# args需要输出的是一个元组,如果只有一个参数,后面加,表示元组,否则会报错threads.append(t)for i in range(thread_num):threads[i].setDaemon(True)threads[i].start()for i in range(thread_num):threads[i].join()if __name__ == "__main__":start = time.time()main()print('耗时:', time.time() - start)"""当前线程sleep 0 秒当前线程sleep 1 秒当前线程sleep 3 秒当前线程sleep 4 秒当前线程sleep 5 秒当前线程sleep 6 秒当前线程sleep 8 秒当前线程sleep 2 秒当前线程sleep 7 秒当前线程sleep 9 秒耗时: 0.0029397010803222656"""
为啥队列是线程安全的?
因为队列Queue的put方法和get方法都是原子操作
tips:
对CPU密集型代码(循环操作)—多进程效率高
对IO密集型代码(文件操作、爬虫)—多线程效率高
原子操作
指不会被线程调度机制打断的操作,这种操作一旦开始,就一直运行到结束,中间不会切换到其他线程。
常见的原子操作:
L.append(i)L1.extend(L2)x = L[i]x = L.pop()L[i:j] = L2L.sort()x = yx.field = yD[x] = yD1.update(D2)D.keys()
gevent实现协程
from __future__ import print_functionimport geventfrom gevent import monkey, sleepmonkey.patch_all()import timeimport requestsurls = ['https://www.baidu.com/','https://kitten4.codemao.cn/']def print_head(url):start_time = time.time()print('Starting %s' % url)data = requests.get(url).textprint('%s: %s bytes: %r' % (url, len(data), data[:50]))total_time = time.time() - start_timeprint('total time is %s' % total_time)# 生成Greenlet对象jobs = [gevent.spawn(print_head, _url) for _url in urls]# 使用joinall对Greenlet对象进行管控,等待所有greenlet对象以达到协程自动切换的目的# 返回所有已经join的greenlet对象列表gevent.joinall(jobs)"""Starting https://www.baidu.com/Starting https://kitten4.codemao.cn/https://www.baidu.com/: 2443 bytes: '<!DOCTYPE html>\r\n<!--STATUS OK--><html> <head><met'total time is 0.12381315231323242https://kitten4.codemao.cn/: 15049 bytes: '<!doctype html><html><head><script>"use strict";\n\n'total time is 0.508491039276123"""
gevent下的monkey机制
自动替换原来的thread、socket、time、multiprocessing等代码,全部变成gevent框架
from gevent import monkey, sleepmonkey.patch_all()import socketdef print_monkey():print('obj', socket.socket)jobs = [gevent.spawn(print_monkey)]# 使用joinall对Greenlet对象进行管控,等待所有greenlet对象以达到协程自动切换的目的# 返回所有已经join的greenlet对象列表gevent.joinall(jobs)"""obj <class 'gevent._socket3.socket'>"""# 没有打补丁的情况下import socketdef print_monkey():print('obj', socket.socket)jobs = [gevent.spawn(print_monkey)]# 使用joinall对Greenlet对象进行管控,等待所有greenlet对象以达到协程自动切换的目的# 返回所有已经join的greenlet对象列表gevent.joinall(jobs)"""obj <class 'socket.socket'>"""
gevent延时操作
from gevent import monkey, sleepmonkey.patch_all()import geventdef f1():for i in range(6, 11):print('this is ' + str(i))# 会主动调用切换函数gevent.sleep(2)def f2():for i in range(5):print('that is ' + str(i))t1 = gevent.spawn(f1)t2 = gevent.spawn(f2)gevent.wait([t1, t2])"""this is 6that is 0that is 1that is 2that is 3that is 4this is 7this is 8this is 9this is 10"""
gevent常用方法
| gevent.spawn() | 创建一个普通的Greenlet对象并切换 | 
|---|---|
| gevent.spawn_later(seconds=1) | 延时创建一个普通的Greenlet对象并切换 | 
| gevent.getcurrent() | 返回当前正在执行的greenlet | 
| gevent.joinall(jobs) | 将协程任务添加到事件循环,接收一个任务列表 | 
| gevent.wait() | 替代join函数等待循环结束,,也可以传入协程对象列表 | 
| gevent.kill() | 杀死一个协程 | 
| gevent.killall() | 杀死一个协程列表里的所有协程 | 
| monkey.patch_all() | 自动将python的一些标准模块替换成gevent框架 | 
gevent中的组和池
- 组(group)是一个运行中greenlet集合,集合中的greenlet像一个组一样会被共同管理和调度
- Group().add():将greenlet添加到group中
 - Group().map(func, iterable):由第二个参数控制迭代次数,返回可迭代对象执行结果列表
 - Group().imap():返回一个可迭代对象 ```python def talk(msg): for i in range(2): print(msg)
 
 
g1 = gevent.spawn(talk, ‘bar’) g2 = gevent.spawn(talk, ‘foo’) print(g1) group = Group()
add方法,将greenlet添加到group中
group.add(g1) group.add(g2)
等待spawn完成,完成即从group里面去掉
group.join()
“””
def talk_map(data): print(‘Size of group %s’ % len(group1)) print(‘Hello from Greenlet %s’ % id(getcurrent())) return data
group1 = Group()
返回可迭代对象执行结果列表
res = group1.map(talk_map, [1, 2, 3]) print(type(res)) print(res)
“”” Size of group 3 Hello from Greenlet 140581403301936 Size of group 3 Hello from Greenlet 140581403302224 Size of group 3 Hello from Greenlet 140581403302512
def talk_imap(data): print(‘Size of group %s’ % len(group2)) print(‘Hello from Greenlet %s’ % id(getcurrent())) return data
group2 = Group()
返回可迭代对象执行结果列表
res2 = group2.imap(talk_imap, range(5), maxsize=1) print(type(res2)) print(res2) for i in res2: print(i)
“””
�2. 池(Pool)是一种用于处理需要限制并发性的动态数量1. Pool().map(func, iterable):由第二个参数控制迭代次数,返回可迭代对象执行结果列表2. Pool().imap():返回一个可迭代对象```pythonpool = Pool(2)def hello_from(n):print('Size of pool %s' % len(pool))return 44# pool.map(hello_from, range(4))for i in pool.imap(hello_from, range(3)):print(i)"""Size of pool 2Size of pool 24444Size of pool 144"""
�
gevent结合数据结构
队列queue是一个排序的数据集合,常用的方法如下:
| put_nowait(item) | 非阻塞的往队列放入数据,队列满则抛出full exception | 
|---|---|
| get_nowait() | 非阻塞的从队列读取数据,队列为空则抛出empty exception | 
| put(block=True, timeout=None) | 从队列放入数据,可选是否阻塞和超时时间 | 
| get(block=True, timeout=None) | 从队列读取数据,可选是否阻塞和超时时间 | 
| peek(block=True, timeout=None) | 与get()类似,但获取的数据不会从队列移除 | 
| peek_nowait() | 类似get_nowait() | 
| empty() | 队列为空返回True | 
| full() | 队列已满返回True | 
| qsize() | 返回队列长度 | 
from gevent import monkeyfrom gevent.queue import Queuemonkey.patch_all()import geventtasks = Queue()def worker(n):while not tasks.empty():task = tasks.get()print("Worker {} got task {}".format(n, task))gevent.sleep(0)print("没有数据了")def producer():for i in range(3):tasks.put(i)print("produce: {}".format(i))gevent.spawn(producer).join()gevent.joinall([gevent.spawn(worker('worker A')),gevent.spawn(worker('worker B')),gevent.spawn(worker('worker C')),])"""produce: 0produce: 1produce: 2Worker worker A got task 0Worker worker A got task 1Worker worker A got task 2没有数据了没有数据了没有数据了"""
python中的锁
GIL锁
全局解释器锁,每个线程在执行的时候都需要先获取GIL,保证同一个时刻只有一个线程可以执行代码,即说同一进程内的多个线程同一时间只能有一个运行
- 释放GIL锁的时机:
 
- 遇到IO操作,会造成CPU闲置,会释放GIL
 - 会有专门的triks计数,到达一定值会释放GIL锁,这时候线程之间会开始竞争GIL锁
 - GIL锁和互斥锁的关系:
 
- 假设一个进程中有多线程运行,线程A获得GIL—>获得互斥锁lock,线程A在开始修改数据之前,遇到IO操作(数据读入内存或者内存输出的过程)
 - 线程A释放GIL,线程A、B开始竞争GIL锁
 - 假设线程B获得GIL锁—>因为有互斥锁lock,B无法修改数据,释放GIL锁
 - 假设线程A再次竞争到GIL锁,因为其占有互斥锁,开始修改数据—>释放互斥锁
 - 这时候线程B竞争到GIL和互斥锁后才能修改数据
 - 综上,cpython中,多线程比单线程性能快的原因是:遇到IO阻塞时,正在执行的线程会释放GIL锁,其他线程会利用这个空闲时间执行自己的代码
 - 关于计算密集型,使用多进程效率高,关于IO密集型,使用多线程效率高
 
# 进程应用场景# 计算密集型:多进程效率高from multiprocessing import Processfrom threading import Threadimport os,timedef work():res=0for i in range(10000000):res*=iif __name__ == '__main__':l=[]print(os.cpu_count()) #本机为4核start=time.time()for i in range(4):p=Process(target=work) #耗时run time is 0.8030457496643066# p=Thread(target=work) #耗时run time is 2.134121894836426l.append(p)p.start()for p in l:p.join()stop=time.time() #print('run time is %s' %(stop-start))
同步锁
import threadingimport time"""同步锁"""lock = threading.Lock()"""没有加锁的情况"""def func():global numnum1 = numtime.sleep(0.1) # 此处线程被释放,num值还没有修改num = num1 - 1num = 100l = []for i in range(100):t = threading.Thread(target=func, args=())t.start()l.append(t)for i in l:i.join()print(num) # 99"""加锁的情况"""def func_1():global num3with lock:num2 = num3time.sleep(0.1)num3 = num2 - 1num3 = 100l2 = []for i in range(100):t = threading.Thread(target=func_1, args=())t.start()l2.append(t)for i in l2:i.join()print(num3) # 0
递归锁(重入锁)
为了支持同一个线程中多次请求同一资源,Python 提供了可重入锁(RLock)。这个RLock内部维护着一个锁(Lock)和一个计数器(counter)变量,counter 记录了acquire 的次数,从而使得资源可以被多次acquire。直到一个线程所有 acquire都被release(计数器counter变为0),其他的线程才能获得资源。
"""递归锁"""import timeimport threadingr_lock = threading.RLock()class MyThread(threading.Thread):def __init__(self):threading.Thread.__init__(self)def run(self):self.fun_A()self.fun_B()def fun_A(self):r_lock.acquire()print("A加锁1", end='\t')r_lock.acquire()print('A加锁2', end='\t')time.sleep(0.2)r_lock.release()print('A释放1', end='\t')r_lock.release()print('A释放2')def fun_B(self):r_lock.acquire()print("B加锁1", end='\t')r_lock.acquire()print('B加锁2', end='\t')time.sleep(3)r_lock.release()print('B释放1', end='\t')r_lock.release()print('B释放2')if __name__ == '__main__':t1 = MyThread()t2 = MyThread()t1.start()t2.start()"""A加锁1 A加锁2 A释放1 A释放2A加锁1 A加锁2 A释放1 A释放2B加锁1 B加锁2 B释放1 B释放2B加锁1 B加锁2 B释放1 B释放2""""""注意观察程序的运行,当运行到程序B时,即使B休眠了3秒也不会切换线程"""
信号量
信号量是一个内部数据,它有一个内置的计数器,它标明当前的共享资源可以有多少线程同时读取。
信号量控制规则:当计数器大于0时,那么可以为线程分配资源权限;当计数器小于0时,未获得权限的线程会被挂起,直到其他线程释放资源。
"""信号量"""import randomimport threadingimport time# 创建信号量对象,信号量设置为3,需要有三个线程才启动semaphore = threading.Semaphore(3)def func():# 获取信号 -1if semaphore.acquire():print(threading.currentThread().getName() + '获得信号量')time.sleep(random.randint(1, 5))# 释放信号 +1semaphore.release()for i in range(10):t1 = threading.Thread(target=func)t1.start()"""注意观察程序运行,开始只有3个线程获得了资源的权限,后面当释放几个资源时就有几个获得资源权限"""
信号量被初始化为0,目的是同步两个或多个线程。线程必须并行运行,所以需要信号量同步。
"""信号量"""import randomimport threadingimport time# 同步两个不同线程,信号量被初始化为0semaphore = threading.Semaphore(0)def consumer():print("----等待producer运行----")semaphore.acquire() # 获取资源,信号量为0被挂起,等待信号量释放print("----consumer 结束--- 编号:%s" % item)def producer():global itemtime.sleep(3)item = random.randint(0, 100)print("producer运行编号:%s" % item)semaphore.release()if __name__ == "__main__":for i in range(0, 4):t1 = threading.Thread(target=producer)t2 = threading.Thread(target=consumer)t1.start()t2.start()t1.join()t2.join()print('程序终止')"""信号量被初始化为0,目的是同步两个或多个线程。线程必须并行运行,所以需要信号量同步。这种运用场景有时会用到,比较难理解,多运行示例仔细观察打印结果"""
参考:https://zhuanlan.zhihu.com/p/37620890
参考:https://zhuanlan.zhihu.com/p/46368084

