并发编程
提升代码的执行效率
串行 并发和并行
串行 多个人去排队按照先后顺序逐一去执行
并发 同时运行多个任务,比如有多个任务,只有一个cpu,那么在同一时刻只能处理一个任务,为了避免串行,可以让任务切换运行(每个任务执行一点再切换),达到并发的效果(看似再同时运行)如协程 多线程
并行 假设有多个任务,有多个cpu,那么同一时刻每个cpu都执行一个任务,真正的同时运行任务,如多进程
线程
线程 是计算机中可以被cpu调度的最小单元 (真正工作的单位 )
进程 是计算机资源分配的最小单元为线程提供资源 (进程默认会创建一个线程) 进程只是一个资源单元或者说是资源集合
import threading #线程模块
t = threading.Thread(target = '函数',args = (11,22,33)#传如函数的参数 )
t.start() #开始线程
import time
import request
import threading
t = threading.Thread(target = '函数',args=(1,2,3))
url_list = [
(XXX,url),
(XXX,url),
]
def task(filename,video_url)
res = requests.get(video_url)
with open(file_name,mode='wb') as f:
f.write(res.content)
print(time.time())
for name,url in url_list:
#创建线程,让每个线程都去执行task函数(参数不同)
t = threading.Thread(traget = task,args = (name,url))
t.start()
#执行程序的线程为主线程,在下载的时候不会等待继续往下执行代码,执行完之后等待子线程,子线程完成后程序才完成
多线程开发
import threading
def task(arg)
pass
#创建一个thread对象(线程)并封装线程被cpu调用时应该执行的任务和相关参数
t = threading.Thread(targ=task,args=('XXX',))
#线程准备就绪等待cpu调度,代码继续象下执行
t.start()
print('继续执行') #主线程执行完所有代码,不结束等待子线程
线程的常见方法
t.start() 当前线程准备就绪,等待cpu调度,具体的时间由cpu决定
import threading
loop = 100000
number = 0
def _add (count):
global number
for i in range(count):
number +=1
t = threading.Thread(target=_add,args=(loop,))
t.start()
print(number)
t.join() 让主线程等待当前子线程的任务执行完毕后再继续往下执行
import threading
loop = 100000
number = 0
def _add (count):
global number
for i in range(count):
number +=1
t = threading.Thread(target=_add,args=(loop,))
t.start()
t.join() # 主线程等待t线程执行完后才继续往下走 (主线程从上往下执行代码)
print(number)
import threading
number = 0
def _add ():
global number
for i in range(10000):
number +=1
def _sub():
global number
for i in range(10000):
number -=1
t = threading.Thread(target=_add)
t1 = threading.Thread(target=_sub)
t.start()
t.join() # t线程执行完毕,才继续往下走
#主线程卡在这里,当前只有t线程可以被cpu调度,因为t线程执行了start(),当t线程执行完了,主线程才继续往下走
t1.start()
t1.join() #t1线程执行完毕,才继续往下走
print(number)
import threading
loop = 10000
number = 0
def _add(count):
global = number
for i in range(count)
number+=1
def _sub(count)
global = number
for i in range (count)
number -=1
t1 = threading.Thread(target= _add,args=(loop,))
t2 = threading.Thread(target= _sub,args=(loop,))
t1.start()
t2.start() #cpu在执行线程的时候不会一直让一个线程运行,而是切换着运行,这样就导致数据可能不准确
t1.join() #t1线程执行完毕之后,才继续望后走
t2.join() #t1线程执行完毕之后,才继续望后走
print(number)
t.setDaemon(布尔值),守护线程 必须放在start之前
t.setDaemon(True),设置为守护线程,主线程执行完毕后,子线程也自动关闭
t.setDaemon(False),设置为非守护线程,主线程等待子线程,字线程执行完毕后,主线程才结束(默认)
import threading
import time
def task(arg):
time.sleep(5)
print("任务")
t = threading.Thread(target= task,args = (11,))
t.setDaemon(True)
t.start()
print('end')
线程名称的设置和获取
import threading
def task(arg):
#获取当前执行此代码的线程的名称
name = threading.current_thread().getName()
print(name)
for i in range(10):
t = threading.Thread(target=task.args=(11,))
t.setName('name_{}'.formait(i)) #设置线程的名字需要在start()之前
t.start()
自定义线程类,直接将线程需要做的事情写到run方法中
import threading
class Mythread(threading.Thread):
def run(slef):
print('执行此线程',self._args)
t = Mythread(args= (100,))
t.start()
# 执行线程其实就是执行线程中的run方法
import requests
import threading
class DouyintThread(thread.Thread):
def run(self):
file_name,video_url=self._args
res = request.get(video_url)
with open(file_name,mode='wb') as f:
f.write(res.content)
url_list = [(文件名,网址),(文件名,网址),(文件名,网址)]
for item in url_list :
t = DouyinThread(args=(item[0],item[1]))
t.start()
线程安全
一个进程中可以有多个线程,且线程共享进程中所有的资源
多个线程同时去操作一个东西,可能会存在数据混乱的情况。这种情况叫做线程不安全
可以通过锁来解决不安全,用锁来控制其他线程的运行,只有申请到锁的才可以运行,没有申请到的等待
#不安全情况
import threading
loop = 10000
number = 0
def _add(count):
global = number
for i in range(count):
number+=1
def _sub(count):
global number
for i in range(count):
number-=1
##cpu在执行线程的时候不会一直让一个线程运行,而是切换着运行,这样就导致数据可能不准确,
#如t1线程再相加的时候可能还没完成相加操作就切换到t2线程了,再切回t1线程的时候就继续往下相加了
t1= reading.Thread(target=_add,args=(loop,))
t2= reading.Thread(target=_sub,args=(loop,))
t1.start()
t2.start()
import threading
lock_object = threading.Rlock() #创建锁
loop = 10000
number = 0
def _add(count):
look_object.acquire() #加锁或者叫申请锁 加锁之前先申请锁,谁先来的谁先申请,申请后就加锁 继续往下执行
global = number
for i in range(count):
number+=1
look.object.release()#释放锁
def _sub(count):
look_object.acquire() #此时在等等着申请锁,只有前一个释放了才可以申请到,两个线程用用一把锁才有意义 没有申请到锁的就等待者,只有前一把锁释放了才可以申请到锁
global number
for i in range(count):
number-=1
look.object.release()#解锁
t1= reading.Thread(target=_add,args=(loop,))
t2= reading.Thread(target=_sub,args=(loop,))
t1.start()
t2.start()
#数据混乱情况
import threading
num = 0
def task():
global = num
for i in range(10000):
num+=1
print(num)
for i in range(2)
t = threading.Thread(target=task)
t.start()
================================================================================
#解决方法
import threading
num = 0
lock_object= threading.Rlock()
def task():
lock_object.aceqire() #第一个到达的线程先来到申请并加锁,其他线程就需要等到申请锁的过程
global = num
for i in range(10000):
num+=1
lock_object.release() #线程出去,并释放锁,其他线程就可以进入并执行了
print(num)
for i in range(2)
t = threading.Thread(target=task)
t.start()
加锁释放锁支持上下文管理 with
import threading
num = 0
lock_object = threading.RLOCK()
def task():
print('开始')
with lock_object: #基于上下文管理,内部自动执行acquire 和 release
global = num
for i in range(10000):
num+=1
print('结束')
for i in range(2):
t = threading.Thread(target=task)
t.start()
在开发过程中有些操作默认都是线程安全的(内部集成了锁的机制),使用的时候不用再使用锁的进制
import threading
data_list = []
lock_object = threading.Rlock()
def task():
print('开始')
for i in range(10000):
data_list.append(i)
print(len(data_list))
for i in range(2):
t = threading.Thread(target= task)
t.start()
线程锁
程序中自己手动加锁 有两种Lock和Rlock
Lock 同步锁 不支持嵌套
import threading
num = 0
lock_object = threading.Lock()
def task():
print('开始')
lock_object.acquire() #第一个抵达的线程进入并上锁,其他线程就需要等待
global num
for i in range(1000):
num+=1
lock_object.release() #线程出去,并解开锁,其他线程就可以进入并执行了
print(num)
for i in range(2):
t = threading.Thread(target=task)
t.start()
Rlock 递归锁 支持多次申请锁和多次解锁支持嵌套
内部有计数器每acquire一次计数器+1,只要计数不为0就不能被其他线程acquire
import threading
num = 0
lock_object = threading.Rlock()
def task():
print('开始')
lock_object.acquire() #加锁
lock_object.acquire() #加锁
print(123)
lock_object.release() #解锁
lock_object.release() #解锁
for i in range(3):
t = threading.Thread(target=task)
t.start()
===========================================================
#应用场景
import threading
lock_object = threading.Rlock()
#A开发了一个函数,函数可以被其他人调用,内部需要基于锁保证线程安全
def func():
with lock:
pass
#B开发了一个函数,以可直接调用这个函数
def run():
print('其他')
func() # 调用A开发的函数,内部用到了锁
print('outher')
#c开发了函数需要自己加锁,同事也需求要调用func函数
def process():
with lock:
print('other')
func() # 此时就会出现多次锁的情况只有Rlock支持。Lock不支持
print('other')
死锁
由于竞争资源或者彼此通信做成的一种阻塞现象
import threading
num = 0
lock_object = threading.Lock()
def task():
print('开始')
lock_object.acquire() #第一个线程申请并加锁
print('00001')
lock_object.acquire()#此时第一个线程卡在此处
global num
for i in range(1000):
num +=1
lock_object.release()
lock_object.release()
for i in range(3):
t = threading.Thread(target=task)
t.start()
=============================================================
锁的资源互相竞争
import threading
import time
lock1 = threading.Lock()
lock2 = threading.Lock()
def task1():
lock1.acquire() # t1申请锁
time.sleep(1)
lock2.acquire() # 此时t1申请lock2的锁因为lock2锁已经被t2申请走 卡住
print(11)
lock2.release()
print(111)
lock1.release()
print(1111)
def task2():
lock2.acquire() # t2申请锁
time.sleep(1)
lock1.acquire() # 此时t2申请lock1的锁因为lock1锁已经被t1申请走 卡住
print(22)
lock1.release()
print(222)
lock2.release()
print(2222)
t1 = threading.Thread(target=task1)
t1.start()
t2 = threading.Thread(target=task2)
t2.start()
线程池
python3中才正式提供线程池
线程不是开的越多越好,开的多了可能会导致系统的性能更低了
不建议无限制的创建线程
import conucurrent.futures import ThreadPoolExectuor
def task(video_url,num):
print('开始',video_url)
time.sleep(5)
#创建一个线程池,最多维护10个线程
pool = ThreadPoolExecutor(10)
url_list = ['www.xxx{}.com'.format(i) for i in range(100)]
for url in url_list: #向线程池中提交100个任务,线程池去执行
#在线程池中提交一个任务,线程池中有空闲的线程则分配一个去执行,执行完毕后将线程交还给线程池,如果没有空闲线程则等待
pool.submit(task,url,2)# 异步调用,把任务全部提交
#主线程等待线程池中的任务执行完毕 然后结束
import conucurrent.futures import ThreadPoolExectuor
def task(video_url):
print('开始',video_url)
time.sleep(5)
#创建线程池,最多维护10个线程
pool = ThreadPoolExecutor(10)
url_list = ['www.xxx{}.com'.format(i) for i in range(100)]
for url in url_list: #向线程池中提交100个任务,线程池去执行
#在线程池中提交一个任务,线程池中有空闲的线程则分配一个去执行,执行完毕后将线程交还给线程池,如果没有空闲线程则等待
pool.submit(task,url)#函数 参数
print('执行中')
pool.shutdowm(wait=True) #等待线程池中的所有任务执行完毕后,再继续执行
print('继续往下走')
回调函数add_done_callback()
#该线程执行完任务后再干点别的事情
import time
import random
import concurrent.futures import ThreadPoolExectuor,tuture
def task(video_url):
print('开始',video_url)
time.sleep(5)
return random.randit(0,10)
def done(reaponse):
print('任务执行完后的返回值',response.result())#result可以拿到
#创建线程池,最多维护10个线程
pool = ThreadPoolExecutor(10)
url_list = ['www.xxx{}.com'.format(i) for i in range(100)]
for url in url_list:
future=pool.submit(task,url)
future.add_done_callback(done)# 传入函数名 #是子主线程执行
# pool.submit(task,url)任务执行完毕,return返回值后就会执行回调用函数
#把任务交给线程池后会返回一个特殊的对象,这个对象不是结果,通过该对象可以调用add_done_callback函数,是线程中的任务执行完后,再执行一下done函数
#把任务交给线程池后会返回一个特殊的对象,通过对象.result()可以拿到返回值
#可以用task专门下载,done专门做下载的数据写入本地
#第一个线程执行task后会有一个返回值,这个线程再去执行done的时候会把返回值封装进response,通过result()可以拿到最终的结果
#统一获取结果
import time
import random
import concurrent.futures import ThreadPoolExectuor,tuture
def task(video_url):
print('开始',video_url)
time.sleep(5)
return random.randit(0,10)
pool = ThreadPoolExecutor(10)
future_list = []
for url in url_list:
future=pool.submit(task,url)
future_list.append(future)
pool.shutdown(True)
for i in future_list:
print(i.result())
线程queue
队列:先进先出
import queue
q=queue.Queue()
q.put('first')
q.put('second')
q.put('third')
q.put(4,block=True,timout=3)# 队列阻塞的时候阻塞3秒,如果3秒后还是阻塞则抛出异常
print(q.get())
print(q.get())
print(q.get())
'''
结果(先进先出):
first
second
third
'''
堆栈:后进先出
import queue
q=queue.LifoQueue()
q.put('first')
q.put('second')
q.put('third')
print(q.get())
print(q.get())
print(q.get())
'''
结果(后进先出):
third
second
first
'''
优先级队列:存储数据时可设置优先级的队列
import queue
q=queue.PriorityQueue()
#put进入一个元组,元组的第一个元素是优先级(通常是数字,也可以是非数字之间的比较),数字越小优先级越高,第二个元素是存入的数据
q.put((20,'a'))
q.put((10,'b'))
q.put((30,'c'))
print(q.get())
print(q.get())
print(q.get())
'''
结果(数字越小优先级越高,优先级高的优先出队):
(10, 'b')
(20, 'a')
(30, 'c')
'''
event事件
线程的一个关键特性是每个线程都是独立运行且状态不可预测。如果程序中的其 他线程需要通过判断某个线程的状态来确定自己下一步的操作,这时线程同步问题就会变得非常棘手。为了解决这些问题,我们需要使用threading库中的Event对象。 对象包含一个可由线程设置的信号标志,它允许线程等待某些事件的发生。在 初始情况下,Event对象中的信号标志被设置为假。如果有线程等待一个Event对象, 而这个Event对象的标志为假,那么这个线程将会被一直阻塞直至该标志为真。一个线程如果将一个Event对象的信号标志设置为真,它将唤醒所有等待这个Event对象的线程。如果一个线程等待一个已经被设置为真的Event对象,那么它将忽略这个事件, 继续执行
例如,有多个工作线程尝试链接MySQL,我们想要在链接前确保MySQL服务正常才让那些工作线程去连接MySQL服务器,如果连接不成功,都会去尝试重新连接。那么我们就可以采用threading.Event机制来协调各个工作线程的连接操作
from threading import Thread,Event
def student(name):
print('学生在听课')
event.wait()
print('学生课间活动')
def teacher(name):
print('老师在授课')
event.set()
if __name__ == '__main__':
stu1=Thread(target=student,args=('l1'))
stu2=Thread(target=student,args=('l2'))
stu3=Thread(target=student,args=('l3'))
t1=Thread(target=teacher,args=('l4'))
stu1.start()
stu2.start()
stu3.start()
t1.start()
event.isSet():返回event的状态值;
event.wait(参数time):如果 event.isSet()==False将阻塞线程,超过参数就继续往下运行 等待 time 时间后,执行下一步。或者在调用 event.set() 后立即执行下一步。
event.set(): 设置event的状态值为True,所有阻塞池的线程激活进入就绪状态, 等待操作系统调度;
event.clear():恢复event的状态值为False。
信号量
信号量也是一把锁,可以指定信号量为5,对比互斥锁同一时间只能有一个任务抢到锁去执行,信号量同一时间可以有5个任务拿到锁去执行,如果说互斥锁是合租房屋的人去抢一个厕所,那么信号量就相当于一群路人争抢公共厕所,公共厕所有多个坑位,这意味着同一时间可以有多个人上公共厕所,但公共厕所容纳的人数是一定的,这便是信号量的大小.
Semaphore管理一个内置的计数器,
每当调用acquire()时内置计数器-1;
调用release() 时内置计数器+1;
计数器不能小于0;当计数器为0时,acquire()将阻塞线程直到其他线程调用release()。
from threading import Thread,Semaphore
import threading
import time
def func():
sm.acquire()
print('%s get sm' %threading.current_thread().getName())
time.sleep(3)
sm.release()
if __name__ == '__main__':
sm=Semaphore(5)
for i in range(23):
t=Thread(target=func)
t.start()
定时器
定时器,指定n秒后执行某操作
from threading import Timer
def hello(n):
print("hello, world")
t = Timer(1, hello,args=(12)) # 1秒之后执行hello
t.start() # after 1 seconds, "hello, world" will be printed
进程
进程中默认会创建一个线程(主线程)进程与进程之间的资源是隔离的
多进程比多线程的开销要大
python中通过多进程可以利用cpu多核的优势,计算密集型操作适用与多进程
t = multiprocessing.Process(target = 函数名,args = (1,2,3))#创建进程
t.start()
#使用多进程的时候需要放在 if__name__ = '__main__' 下面
#linux基于fork windows基于spawn mac基于fork和spawn (python3.8之后是spawn)
#multiprocessing.set_start_method('fork') 模式修改为fork就可以不用放在 if __name__ = '__main__'下面了
os.getpid() # 查看当前进程的id
os.getppid() # 查看父进程的id
多进程开发
进程是计算机中资源分配的最小单元,一个进程中可以有多个线程,同一个进程中的线程共享进程的资源
进程与进程之间则是相互隔离的
from multoprocessing import Process
def task():
pss
def run():
t = multiprocessing.Process(target = 函数名,args = (1,2,3))#创建进程
t.start() # 告诉操作系统开通一个子进程,具体什么时候开通这个进程是操作系统决定的
if__name__ = '__main__' :
run() #不是说把进程的代码放在if__name__ = '__main__' ,而是main启动就好了
进程模式
通过multiprocessing操作进程有3中模式
fork 拷贝几乎所有资源 支持文件对象、线程锁等传参 unix系统 进程代码任意位置
#通过一个进程创建一个子进程的时候,子进程会拷贝当前进程的资源
#特点,通过父进程创建的子进程几乎会把父进程中的资源拷贝一份
import multiprocessing
def task():
print(name)
if __name__ ='__main__':
multiprocessing.set_start_method('fork') #创建进程的模式改为fork
name = []
p1 = multiprocessing.Process(target=task) #创建子进程去运行task函数,子进程拷贝父进程的数据以及父进程中文件对象和锁的当前状态 ,子进程和主进程的数据是两份数据各自维护自己的
p1.start()
=================================================================
import multiprocessing
def task():
print(name)
if __name__ ='__main__':
multiprocessing.set_start_method('fork') #创建进程的模式改为fork
name = []
name.append(123)
p1 = multiprocessing.Process(target=task) #创建子进程去运行task函数,子进程拷贝父进程的数据 ,子进程和主进程的数据是两份数据各自维护自己的
p1.start()
print(name)
=================================================================
import multiprocessing
def task():
print(name)
if __name__ ='__main__':
multiprocessing.set_start_method('fork') #创建进程的模式改为fork
name = []
p1 = multiprocessing.Process(target=task) #创建子进程去运行task函数,子进程拷贝父进程的数据 ,子进程和主进程的数据是两份数据各自维护自己的
p1.start()
name.append(123)
print(name)
==============================================================================
#文件对象和线程锁可以直接拷贝也可以传参拷贝
import multiprocessing
def task():
print(name)
file_object.write('kill\n')
file_object.flush()
if __name__ = '__main__':
multiprocessing.set_start_method('fork')
name = []
file_object = open('x1.txt',mode='a+',encoding='utf-8')
file_object.write('刘小刘\n')#写到内存中 (此时在进程中还没有写到文件里面)
p1= multiprocessing.Process(target=task) #fork模式会拷贝主线程的资源,包括内存中的
p1.start()
#主进程在终止的时候会把内存中的资源刷新到硬盘
"""文件里面的内容是
刘小刘
kill
刘小刘
"""
==============================================================================
import multiprocessing
def task():
print(name)
file_object.write('kill\n')
file_object.flush()
if __name__ = '__main__':
multiprocessing.set_start_method('fork')
name = []
file_object = open('x1.txt',mode='a+',encoding='utf-8')
file_object.write('刘小刘\n')
file_object.flush() #内存中的数据已经被刷新到硬盘
p1= multiprocessing.Process(target=task) #此时进程拷贝的时候会有文件对象,但是文件对象没有值
p1.start()
"""
刘小刘
kill
"""
===============================================================================
import multiprocessing
import threading
def task():
print(lock)
with lock:
print('执行中')
if __name__ = '__main__':
multiprocessing.set_start_method('fork')
name = []
lock = threading.Rlock() #主进程创建锁
lock.acquire() #主进程申请并加锁
p1 = multiprocessing.Process(target = task)#子进程在拷贝资源的时候,锁的状态已经是被主进程的主线程申请走的状态,所以锁在子进程的状态也是被申请走的状态,在子进程中 是被子进程中的主线程申请走了
p1.start()
"""
print(lock)
<unlock _thread.Rlock object owner =0 count=0 at 0x7fbc800cefc0 > 未申请走的状态
<locked _thread.Rlock object owner =4709898909 count=1 at 0x7fbc800cefc0 >申请走的状态
"""
========================================================================
import multiprocessing
import threading
def task():
print(lock) #Rlock支持锁多次
lock.acquire()
print(666)
if __name__ = '__main__':
multiprocessing.set_start_method('fork')
name = []
lock = threading.Rlock() #主进程创建锁
lock.acquire() #主进程申请并加锁
p1 = multiprocessing.Process(target = task)
p1.start()
===========================================================================
import multiprocessing
import threading
def func():
print('来了')
with lock: #创建的10个线程都开在这里,因为锁被子进程的主线程申请去了
print(666)
def task():
for i in range(10):
t = threading.Thread(target=func)
t.start()
lock.release() #在此处解锁,就不会卡住了
if __name__ = '__main__':
multiprocessing.set_start_method('fork')
name = []
lock = threading.Rlock() #主进程创建锁
lock.acquire() #主进程申请并加锁
p1 = multiprocessing.Process(target = task)
p1.start()
spawn run参数传必备的资源 不支持文件对象、线程锁等传参 unix win系统 进程代码main位置
#不会拷贝资源,需要手动传参
#创建子进程的时候会在内部创建一个python解释器,让python解释器去运行代码
import multiprocessing
def task():
print(name)
if __name__ ='__main__':
multiprocessing.set_start_method('spawn') #创建进程的模式改为spawn
name = []
p1 = multiprocessing.Process(target=task) #cpu在运行的时候是运行的Process中的run方法
p1.start()
# 会报错,因为子进程中没有name
================================================================
import multiprocessing
def task(data):
print(data)
if __name__ ='__main__':
multiprocessing.set_start_method('spawn') #创建进程的模式改为fork
name = []
p1 = multiprocessing.Process(target=task,args=(name)) #子进程拿到name后会拷贝一份传给data,他们之间各自维护自己的数据
p1.start()
=====================================================================
#文件对象和线程锁不支持传参需要的话在子进程中创建
forkservre run参数传必备的资源 不支持文件对象、线程锁等传参 部分unix main代码块
#正常代码是由主进程中的主线程来运行的,如果是forkserver模式,在代码运行之前,python会先创建一个进程,在进程中创建一个线程,把他当作模板放进去,在代码中要是创建子进程,则把模板拷贝一份当作子进程(模板相当于一个什么也没有的进程)
进程的常见方法
p.start() 当前线程准备就绪,等待cpu调度
p.join() 等待当前子进程的任务执行完毕后主进程再向下继续执行
import time
from multipricessing import Process
def task(arg):
print('执行中')
if __name__ =="__main__":
multiprocessing.set_start_method('spawn')
p = Process(target=task,args=('XXX',))
p.start()
p.join() # 让主进程等待子进程
print('继续执行')
p.daemon = 布尔值,守护进程必须放在start之前
p.daemon = True 设置为守护进程,主进程执行 完毕后,子进程也自动关闭
p.daemon = False 设置为非守护进程,主进程等子进程。子进程执行完毕后,主进程才结束
进程设置名称和获取名称
import time
import os
import threading
import multiprocessing
def task(arg):
threading.enumerate() #获取当前进程有多少个线程 列表的形式
os.getpid() #当前进程的id
os.getppid() #当前进程的父进程的id
print('当前进程的名字',multiprocess.current_prcess().name) #获取名字
if __name__ = '__main__':
multiprocessing.set_start_method('spawn')
p = multiprocessing.Process(target=task,args=('XXX'),name=('进程名'))
p.name = '90'#设置名字
p.start()
print('继续执行')
自定义进程类,直接将线程需要做的事情写到run方法
import multipocessing
class MyProcess(multiprocess.Process)
def run(self):
print('执行此线程',self._args)
if __name__=='__main__':
multiprocess.set_start_method('spawn')
p = MyProcess(args=('XXX',))
p.start()
cpu个数
import multiprocessing
multiprocessing.cpu_count()
p.is_alive() 查看进程是否存活 True 或 False
p.terminate()
告诉操作系统结束进程,具体结束时间由操作系统决定
进程之间数据共享
进程是资源分配的最小单元,默认进程与进程之间是互相隔离的,不共享
基于value array实现进程数据共享
from multiprocessing import Process ,Value,Array
def func(n,m1,m2)
n.Value = 888
m1.Value = 'a'.encode('utf-8')
m2.value= '武'
if __name__ =='__main__':
num = value('i',666) #创建666类型为int
v1 = Value('c')#代表一个字符,abc
v2 = Value('u')#代表一个中文
p = Process(target=func,args=(num,v1,v2))# 传给子进程,子进程可以通过.value来进行修改
p.start()
p.join()
print(num.value) #888 主进程的数值是子进程中修改后的
print(v1.value)#a
print(v2.value)#武
=====================================================================
from multiprocessing import Process ,Value,Array
def f(fata_array):
data_array[0] = 666
if __name__ =="__main__":
arr = Array('i',[11,22,33,44])#数组,元素类型必须是int,只能是4个元素
p = Process(target = f,args=(arr,))
p=start()
p.join()
print(arr[:])
基于Manager()
from multiprocessing import Process, Manager
def f(d, i):
d[i] = '1'
d['2'] = '2'
d[0.25] = None
i.append(666)
if __name__ == "__main__":
with Manager() as manager:
d = manager.dict() # 创建一个字典
i = manager.list() # 创建一个列表
p = Process(target=f, args=(d, i)) # 将创建的字典列表传递给子进程
p.start()
p.join()
print(d)
print(i)
基于Queues 队列
队列一个放一个取,先进先出
对列是用的内存的空间
import multiprocessing
def task(q):
for i in range(10):
q.put(i) #put往队列里面放值
q.full() #查看队列里的数据是否到达最大个数
if __name__ == '__main__':
queue = multiprocessing.Queue(4) #主进程创建一个队列,队列里最多放4个数据 省略则不限制个数
p =multiprocessing.Process(target=task,args=(queue,)) # 将队列传递给子进程
p.start()
p.join()
print('主进程')
print(queue.get())#get从队列里面取值
基于Pipes 管道
管道是双向的,a可以发数据也可以取数据b也是
import time
import multiprocessing
def task(conn):
time.sleep(1)
comm.send([11,22,33,44]) #子进程发送数据
data = conn.recv() #阻塞
print('子进程接收',data)
time.sleep(2)
if __name__ =='__main__':
parent_coonn,child_conn = multiprocess.Pipe() #创建一个管道 parent_coonn,child_conn 相当于管道两端
p = multiprocess.Process(target = task,args=(child_conn,)) # 将管道传递给子进程
p.start()
info = parent_conn.recv() #阻塞 # 主进程等待接收
print('主进程接收',info)
parent_comm.send(666) # 主进程发送数据
进程锁
多个进程共享一个资源的时候会用到进程锁
# 数据混乱情况
import time
from multiprocessing import Process ,Value,Array
def func(n,):
n.value = n.value+1
if __name__ == '__main__':
num = value('i',0)
for i in range(20):
p = process(target=func,args=(num,)) #20个进程都去运行func
p.start()
time.sleep(3)
print(num.value)
# 数据混乱情况
import time
from multiprocessing import Process,manager
def f(d,):
d[1]+=1
if __name__ =='__main__':
with Manager() as manager:
d = manager.dict()
d[1]=0
for i in range(20):
p = Process(target=f,args=(d,))
p.start()
time.sleep(3)
print(d)
# 数据混乱情况
import time
import multiprocessing
def task():
with open('f1.txt',mode='r',encoding='utf-8') as f:
current_num = int(f.read())
print('排队抢票')
time.sleep(1)
current_num-=1
with open('f1.txt',mode='w',encoding='utf-8') as f:
f.write(str(current_num))
if __name__ =='__main__':
for i in range(20):
p = multiprocessing.Process(target=task)
p.start()
以上实例在进行操作的时候都会出现数据混乱的情况,就需要锁来介入
import time
import multiprocessing
def task(lock):
print('开始')
lock.acquire() #加锁
with open('f1.txt',mode='r',encoding='utf-8') as f:
current_num = int(f.read())
print('排队抢票')
time.sleep(1)
current_num-=1
with open('f1.txt',mode='w',encoding='utf-8') as f:
f.write(str(current_num))
lock.release() # 释放锁
if __name__ =='__main__':
multiprocessing.set_start_method('spawn')
lock = multiprocessing.Rlock() #创建进程锁
for i in range(10):
p = multiprocessing.Process(target=task,args=(lock,))#将锁传递给子进程,共享一把锁
p.start()
time.sleep(7)#spawn模式需要特殊处理等子进程运行完毕再继续往下执行
==============================================================================
import time
import multiprocessing
def task(lock):
print('开始')
lock.acquire()
with open('f1.txt',mode='r',encoding='utf-8') as f:
current_num = int(f.read())
print('排队抢票')
time.sleep(1)
current_num-=1
with open('f1.txt',mode='w',encoding='utf-8') as f:
f.write(str(current_num))
lock.release()
if __name__ =='__main__':
multiprocessing.set_start_method('spawn')
lock = multiprocessing.Rlock() #进程锁
process_list= []
for i in range(10):
p = multiprocessing.Process(target=task,args=(lock,))#子进程共享一把锁
p.start()
process_list.append(p)
for item in process_list:
item.join() # 等每个子进程运行完才往下走
进程池
import time
from concurrent.futures import ProcessPoolExecutor ,ThreadPoolExecutor
def task(num):
print('执行',num)
timr.sleep(2)
if __name__ = '__main__':
pool = ProcessPoolExecutor(4) #创建进程池最多创建4个进程
for i in range(10): #创建10个任务
pool.submit(task,i) #等待过程是在进程池内部执行的
import time
from concurrent.futures import ProcessPoolExecutor ,ThreadPoolExecutor
def task(num):
print('执行',num)
timr.sleep(2)
if __name__ = '__main__':
pool = ProcessPoolExecutor(4) #创建进程池最多创建4个进程
for i in range(10): #创建10个任务
pool.submit(task,i)
#等待进程池中的任务都执行完了再继续往下执行
#默认是主线程执行完,子进程同步这进行
pool.shutdown(True)
import time
import multiprocessing
from concurrent.futures import ProcessPoolExecutor ,ThreadPoolExecutor
def task(num):
print('执行',num)
time.sleep(2)
return num
def done(res):
print(multiprocessing.current_process()) #查看当前进程
time.sleep(1)
print(res.result())
time.sleep(1)
if __name__ == '__main__':
pool = ProcessPoolExecutor(4) #创建进程池最多创建4个进程
for i in range(10): #创建10个任务
fur = pool.submit(task,i) #fur为对象
#done的调用由主进程处理(与线程池不同)
fur.add_done_callback(done) #done函数中的参数res接收的就是线程执行函数的返回值,此处就是task函数的返回值
print(multiprocessing.current_process())
pool.shutdown(True)
注意,在进程池中要使用进程锁,需要基于Manager中的lock和Rlock来实现
import time
import multiprocessing
from concurrent.futures.process import processPoolExecutor
def task(lock):
with lock:
# 假设文件中就保存了一个10
with open('f1.txt',mode = 'r',encoding='utf-8') as f:
current_num = int(f.read())
print('抢票')
time.sleep(1)
current_num-=1
with open('f1.txt',mode ='w',encoding='utf-8') as f:
f.write(str(current_num))
if __name__ =='__main__':
pool = ProcessPoolExecutor()
#lock_object = multiprocessing.Rlock() 是不能使用的
manager = multiprocessing.Manager()
lock_object = manager.Rlock()
for i in range(10):
pool.submit(task,lock_object)
统计用户次数
import os
import time
from concurrent.futures import ProcessPoolExecutor
from multiprocessing import Manager
def task(file_name, count_dict):
ip_set = set()
total_count = 0
ip_count = 0
file_path = os.path.join("files", file_name)
file_object = open(file_path, mode='r', encoding='utf-8')
for line in file_object:
if not line.strip():
continue
user_ip = line.split(" - -", maxsplit=1)[0].split(",")[0]
total_count += 1
if user_ip in ip_set:
continue
ip_count += 1
ip_set.add(user_ip)
count_dict[file_name] = {"total": total_count, 'ip': ip_count}
time.sleep(1)
def run():
# 根据目录读取文件并初始化字典
"""
1.读取目录下所有的文件,每个进程处理一个文件。
"""
pool = ProcessPoolExecutor(4)
with Manager() as manager:
"""
count_dict={
"20210322.log":{"total":10000,'ip':800},
}
"""
count_dict = manager.dict()
for file_name in os.listdir("files"):
pool.submit(task, file_name, count_dict)
pool.shutdown(True)
for k, v in count_dict.items():
print(k, v)
if __name__ == '__main__':
run()
import os
import time
from concurrent.futures import ProcessPoolExecutor
def task(file_name):
ip_set = set()
total_count = 0
ip_count = 0
file_path = os.path.join("files", file_name)
file_object = open(file_path, mode='r', encoding='utf-8')
for line in file_object:
if not line.strip():
continue
user_ip = line.split(" - -", maxsplit=1)[0].split(",")[0]
total_count += 1
if user_ip in ip_set:
continue
ip_count += 1
ip_set.add(user_ip)
time.sleep(1)
return {"total": total_count, 'ip': ip_count}
def outer(info, file_name):
def done(res, *args, **kwargs):
info[file_name] = res.result()
return done
def run():
# 根据目录读取文件并初始化字典
"""
1.读取目录下所有的文件,每个进程处理一个文件。
"""
info = {}
pool = ProcessPoolExecutor(4)
for file_name in os.listdir("files"):
fur = pool.submit(task, file_name)
fur.add_done_callback(outer(info, file_name)) # 回调函数:主进程
pool.shutdown(True)
for k, v in info.items():
print(k, v)
if __name__ == '__main__':
run()
僵尸进程与孤儿进程
僵尸进程是linux系统提供的一种数据结构,所有的子进程都要经过僵尸进程
子进程结束后不会把所有的信息清除掉,会保留一些状态信息,会清除掉内存空间等。最后由主进程发起请求回收掉僵尸进程。
孤儿进程是主进程结束了但是子进程没结束,linux系统上会有一个init进程,这个进程是所有进程的父进程,所有的孤儿进程由init进程接管。
生产者消费者模型
为什么要使用生产者消费者模型
生产者指的是生产数据的任务,消费者指的是处理数据的任务,在并发编程中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题于是引入了生产者和消费者模式。
什么是生产者和消费者模式
生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。
这个阻塞队列就是用来给生产者和消费者解耦的
生产者消费者模型解决了生产者与消费者之间的速度差 程序解开耦合
协程
线程和进程是真是存在的
协程 是程序员通过代码实现的一个东西,非真实存在的
协程也被叫做微线程,是一种用户态内的上下文切换技术其实就是通过一个线程实现代码块相互切换执行 在开发中结合遇到到IO自动切换,就可以通过一个线程实现并发的操作
优点:开销小,属于程序级别的切换,操作系统感知不到更轻量级
单线程内实现并发的效果最大限度利用cpu
缺点:本质是单线程无法利用cpu多核、
是单个线程一旦协程出现阻塞将会阻塞整个线程
def func():
print(1)
print(2)
def func2():
print(3)
print(4)
func1()
func2()
#运行上述代码正常显示为1234如果使用协程技术就可以实现函数间的代码块切换执行,输出为1324
python中实现协程的方式:
基于greenlet模块
from greenlet import greenlet
def func1(name):
print(1) # 第2步 输出1
gr2.switch() #第3 切换到func2函数
print(2) #第6 输出2
gr2.switch #第7 切换到func2 ,从上次执行的位置继续向后执行
def func2():
print(3) #第4 输出3
gr1.switch() #第5 切换到func1,从上次执行的位置继续向后执行
print(4) #第8 输出4
gr1 = greenlet(func1) # 将函数进行包裹
gr2 = greenlet(func2)
gr1.switch('func1参数') #第一步 去执行func1函数
基于yiled
def func1():
yiled 1
yiled from func2()
def func2():
yiled 3
yiled 4
f1 = func1()
for item in f1:
print(item)
基于gevent
单线程下多个任务遇到io自动切换到另一个任务
#用法
g1=gevent.spawn(func,1,,2,3,x=4,y=5)创建一个协程对象g1,spawn括号内第一个参数是函数名,如eat,后面可以有多个参数,可以是位置实参或关键字实参,都是传给函数eat的
g2=gevent.spawn(func2)
g1.join() #等待g1结束
g2.join() #等待g2结束
#或者上述两步合作一步:gevent.joinall([g1,g2])
g1.value#拿到func1的返回值
上例gevent.sleep(2)模拟的是gevent可以识别的io阻塞,
而time.sleep(2)或其他的阻塞,gevent是不能直接识别的需要用下面一行代码,打补丁,就可以识别了
from gevent import monkey;monkey.patch_all()必须放到被打补丁者的前面,如time,socket模块之前
或者我们干脆记忆成:要用gevent,需要将from gevent import monkey;monkey.patch_all()放到文件的开头
# pip3 install gevent
import gevent
def eat(name):
print('%s eat 1 ' %name)
gevent.sleep(3)
print('%s eat 2 ' %name)
def play(name):
print('%s play 1 ' %name)
gevent.sleep(3)
print('%s play 2 ' %name)
g1 = gevent.spawn(eat,'liu')
g2 = gevent.spawn(play,'yang')
g1.join()
g2.join()
协程的意义
遇到IO等待的时候自动切换
python在3.4之后退出asyncio模块+python3.5推出async,async语法,内部基于协程并且遇到IO请求自动化切换
import asyncio
async def func1():
print(1) #输出1
await asyncio.sleep(2)#io等待2s 在内部自动切换到任务2 2秒等待完成了线程自动切换回来往下执行,#不能使用time.sleep(),这样的话是同步,就不是异步;await就相当于yield from
print(2)
async def func2():
print(3)
await asyncio.sleep(2)
print(4)
# 创建两个任务, tasks就是一个任务列表
tasks = [
asyncio.ensure_future(func1()),
asyncio.ensure_future(func2()),
]
#在内部基于io loop 开始执行两个任务 内部让一个进程中的一个线程执行
loop = asyncio.get_event_loop()
loop.run_untill_complete(asyncio.wait(tasks))#将tasks放到loop中,进行事件循环, 这里必须传入的是一个list,任务列表只能放到asyncio.wait()中
---------------------------------------------------------------
event_loop:事件循环,相当于一个无限循环,可以把一些函数注册到这个无限
循环上,当满足某些条件的时候,函数就会被循环执行
coroutine:协程对象,可以将协程对象注册到事件循环中,他会被事件循环调用
可以使用async 关键字来定义一个方法,这个方法在调用时不会立即执行,而是
返回一个协程对象
task:任务对象,是对协程对象的进一步封装,包含了任务的各个状态
future:代表将来执行或者还没有执行的任务,实际上和task没有本质区别
协程中一旦出现同步的代码,就无法实现异步效果
------------------------------------------------------------------
async:定义一个协程,async修饰的函数,调用该函数之后返回一个协程对象
loop = asyncio.get_event_loop():创建一个事件循环对象
loop.run_untill_complete(协程对象):将协程对象注册到事件循环对象中,然后启动循环对象,该协程对象(也就是函数中的语句就会开始执行)就会开始执行。协程对象或任务对象注册到事件循环对象中就会执行协程对象或任务对象
task = loop.create_task(协程对象):基于loop(事件循环对象)创建一个task任务对象,task任务对象也可以注册到事件循环对象中
task = asyncio.ensure_future(协程对象):基于future的方式注册一个任务对象,该对象也可以被注册到事件循环对象中
task.add_done_callback(回调函数名);将回调函数(任务执行完成后会自动调用这个函数)绑定到任务对象中。会将任务对象自动传参给回调函数的参数,参数.result,返回的就是任务对象中封装的协程对象对应函数的返回值
await:用来挂起阻塞方法的执行,用来用来声明程序挂起,在asyncio中遇到阻塞操作必须进行手动挂起#比如异步程序执行到某一步时需要等待的时间很长,就将此挂起,去执行其他的异步程序。await 后面只能跟异步程序或有__await__属性的对象,因为异步程序与一般程序不同。假设有两个异步函数async a,async b,a中的某一步有await,当程序碰到关键字await b()后,异步程序挂起后去执行另一个异步b程序,就是从函数内部跳出去执行其他函数,当挂起条件消失后,不管b是否执行完,要马上从b程序中跳出来,回到原程序执行原来的操作。如果await后面跟的b函数不是异步函数,那么操作就只能等b执行完再返回,无法在b执行的过程中返回。如果要在b执行完才返回,也就不需要用await关键字了,直接调用b函数就行。所以这就需要await后面跟的是异步函数了。在一个异步函数中,可以不止一次挂起,也就是可以用多个await。
"""
需要先安装:pip3 install aiohttp
"""
import aiohttp
import asyncio
async def fetch(session, url):
print("发送请求:", url)
async with session.get(url, verify_ssl=False) as response:
content = await response.content.read()
file_name = url.rsplit('_')[-1]
with open(file_name, mode='wb') as file_object:
file_object.write(content)
async def main():
async with aiohttp.ClientSession() as session:
url_list = [
'https://www3.autoimg.cn/newsdfs/g26/M02/35/A9/120x90_0_autohomecar__ChsEe12AXQ6AOOH_AAFocMs8nzU621.jpg',
'https://www2.autoimg.cn/newsdfs/g30/M01/3C/E2/120x90_0_autohomecar__ChcCSV2BBICAUntfAADjJFd6800429.jpg',
'https://www3.autoimg.cn/newsdfs/g26/M0B/3C/65/120x90_0_autohomecar__ChcCP12BFCmAIO83AAGq7vK0sGY193.jpg'
]
#3个任务
tasks = [asyncio.create_task(fetch(session, url)) for url in url_list]
# 3个任务开始运行
await asyncio.wait(tasks)
if __name__ == '__main__':
asyncio.run(main())
https://pythonav.com/wiki/detail/6/91
https://zhuanlan.zhihu.com/p/137057192
https://www.bilbili.com/video/BV1NA411g7yf
GIL锁
全局解释器锁(global interpreter lock)让一个进程中同一时刻只有一个线程可以被cpu调用,cpython特有的
使用计算机的多核优势,让那个cpu同时处理一些任务适用于多进程开发,资源开销大的时候
计算密集型:多进程,大量的数据计算
IO密集型:多线程,文件读写,网络传输
python Gil vs thread Lock
案例1 多线程socket
import socket
import threading
def task(conn):
while True:
client_data = conn.recv(1024)
data = client_data.decode('utf-8')
print("收到客户端发来的消息:", data)
if data.upper() == "Q":
break
conn.sendall("收到收到".encode('utf-8'))
conn.close()
def run():
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.bind(('127.0.0.1', 8001))
sock.listen(5)
while True:
# 等待客户端来连接(主线程)
conn, addr = sock.accept()
# 创建子线程
t = threading.Thread(target=task, args=(conn,))
t.start()
sock.close()
if __name__ == '__main__':
run()
=============================================================================
import socket
# 1. 向指定IP发送连接请求
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.connect(('127.0.0.1', 8001))
while True:
txt = input(">>>")
client.sendall(txt.encode('utf-8'))
if txt.upper() == 'Q':
break
reply = client.recv(1024)
print(reply.decode("utf-8"))
# 关闭连接,关闭连接时会向服务端发送空数据。
client.close()
案例2 多进程socket
import socket
import multiprocessing
def task(conn):
while True:
client_data = conn.recv(1024)
data = client_data.decode('utf-8')
print("收到客户端发来的消息:", data)
if data.upper() == "Q":
break
conn.sendall("收到收到".encode('utf-8'))
conn.close()
def run():
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.bind(('127.0.0.1', 8001))
sock.listen(5)
while True:
# 等待客户端来连接
conn, addr = sock.accept()
# 创建了子进程(至少有1个线程)
t = multiprocessing.Process(target=task, args=(conn,))
t.start()
sock.close()
if __name__ == '__main__':
run()
=========================================================================
import socket
# 1. 向指定IP发送连接请求
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.connect(('127.0.0.1', 8001))
while True:
txt = input(">>>")
client.sendall(txt.encode('utf-8'))
if txt.upper() == 'Q':
break
reply = client.recv(1024)
print(reply.decode("utf-8"))
# 关闭连接,关闭连接时会向服务端发送空数据。
client.close()
单例模式
之前写类每次执行类都会实例化一个类的对象
class Foo:
pass
obj1 = Foo()
obj2 =Foo()
print(obj1,obj2)
以后遇到开发会遇到单例模式,每次实例化类的对象时候,都是最开始创建的那个对象不再重复创建对象
基于 __new__
方法实现
class Singleton:
instance = None
def __init__(self,name):
self.name = name
def __new__(cla,*args,**kwargs):#返回对象
if cls.instance:
return cls.instance
cls.instance = object.__new__(cls)# 创建一个空对象
return cls.instance
obj1= Singleton('kil')
obj2= Singleton('hjk')
print(obj1,obj2)
# 多线程实例化单例模式的问题
import threading
import time
class Singleton:
instance = None
def __init__(self,name):
self.name = name
def __new__(cla,*args,**kwargs):#返回对象
if cls.instance:
return cls.instance
time.sleep(0.1) #10个线程都执行到这,然后往下执行就每个线程都创建了一个对象
cls.instance = object.__new__(cls)
return cls.instance
obj1= Singleton('kil')
obj2= Singleton('hjk')
print(obj1,obj2)
def task():
obj = Singleton('x')
print(obj)
for i in range(10):
t = threading.Thread(target=tsak)
t.start()
=====================================================================
# 解决多线程实例化单例模式的问题
import threading
import time
class Singleton:
instance = None
lock = threading.Rlock
def __init__(self,name):
self.name = name
def __new__(cla,*args,**kwargs):#返回对象
with cls.lock: #加上锁
if cls.instance:
return cls.instance
time.sleep(0.1)
cls.instance = object.__new__(cls)
return cls.instance
obj1= Singleton('kil')
obj2= Singleton('hjk')
print(obj1,obj2)
def task():
obj = Singleton('x')
print(obj)
for i in range(10):
t = threading.Thread(target=tsak)
t.start()
基于模块导入的方式
#首先有一个py文件 定义一个类 最后根据类创建一个对象
#对模块来说 第一次被导入成功 第二次导入的时候不会重新导入会使用第一次导入成功的
class Singleton:
def __init__(self):
delf.name = 'liuxiaoliu'
single = Singleton() #01
==================================================================
from XX import single #在导入的时候内部维护一个类并创建了对象,XX会从上往下执行也就是实例化了01
print(single)
# 在其他配置文件导入single的时候会先在内存中找,此时因为之前导入过一次所以可以直接找到
from XX import single
print(single)
同步调用与异步调用
是指提交任务的两种方式
同步调用:提交完任务后就在原地等待任务执行完毕,拿到结果,再执行下一行代码,导致串行执行(和阻塞不同,只是一种调用方式)
异步调用:提交完任务后不在原地等待任务执行完毕
IO模型
五种IO Model:
blocking IO
nonblocking IO
IO multiplexing
signal driven IO
asynchronous IO
由signal driven IO(信号驱动IO)在实际中并不常用,所以主要介绍其余四种IO Model。
IO发生时涉及的对象和步骤。对于一个network IO (这里我们以read举例),它会涉及到两个系统对象,一个是调用这个IO的process (or thread),另一个就是系统内核(kernel)。当一个read操作发生时,该操作会经历两个阶段:
1)等待数据准备 (Waiting for the data to be ready)
2)将数据从内核拷贝到进程中(Copying the data from the kernel to the process)
阻塞IO模型
网络IO 情况:建连接 收消息 发消息
在linux中,默认情况下所有的socket都是blocking,一个典型的读操作流程大概是这样:
当用户进程调用了recvfrom这个系统调用,kernel就开始了IO的第一个阶段:准备数据。对于network io来说,很多时候数据在一开始还没有到达(比如,还没有收到一个完整的UDP包),这个时候kernel就要等待足够的数据到来。
而在用户进程这边,整个进程会被阻塞。当kernel一直等到数据准备好了,它就会将数据从kernel中拷贝到用户内存,
然后kernel返回结果,用户进程才解除block的状态,重新运行起来。
所以,blocking IO的特点就是在IO执行的两个阶段(等待数据和拷贝数据两个阶段)都被block了。
几乎所有的程序员第一次接触到的网络编程都是从listen\(\)、send\(\)、recv\(\) 等接口开始的,
使用这些接口可以很方便的构建服务器/客户机的模型。然而大部分的socket接口都是阻塞型的。如下图
ps:
所谓阻塞型接口是指系统调用(一般是IO接口)不返回调用结果并让当前线程一直阻塞
只有当该系统调用获得结果或者超时出错时才返回。
实际上,除非特别指定,几乎所有的IO接口 ( 包括socket接口 ) 都是阻塞型的。这给网络编程带来了一个很大的问题,如在调用recv(1024)的同时,线程将被阻塞,在此期间,线程将无法执行任何运算或响应任何的网络请求。
一个简单的解决方案:
在服务器端使用多线程(或多进程)。多线程(或多进程)的目的是让每个连接都拥有独立的线程(或进程),
这样任何一个连接的阻塞都不会影响其他的连接。
该方案的问题是:
开启多进程或都线程的方式,在遇到要同时响应成百上千路的连接请求,则无论多线程还是多进程都会严重占据系统资源,
降低系统对外界响应效率,而且线程与进程本身也更容易进入假死状态。
改进方案:
很多程序员可能会考虑使用“线程池”或“连接池”。“线程池”旨在减少创建和销毁线程的频率,
其维持一定合理数量的线程,并让空闲的线程重新承担新的执行任务。“连接池”维持连接的缓存池,尽量重用已有的连接、
减少创建和关闭连接的频率。这两种技术都可以很好的降低系统开销,都被广泛应用很多大型系统,如websphere、tomcat和各种数据库等。
改进后方案其实也存在着问题:
“线程池”和“连接池”技术也只是在一定程度上缓解了频繁调用IO接口带来的资源占用。而且,所谓“池”始终有其上限,
当请求大大超过上限时,“池”构成的系统对外界的响应并不比没有池的时候效果好多少。所以使用“池”必须考虑其面临的响应规模,
并根据响应规模调整“池”的大小。
对应上例中的所面临的可能同时出现的上千甚至上万次的客户端请求,“线程池”或“连接池”或许可以缓解部分压力,但是不能解决所有问题。总之,多线程模型可以方便高效的解决小规模的服务请求,但面对大规模的服务请求,多线程模型也会遇到瓶颈,可以用非阻塞接口来尝试解决这个问题。
非阻塞IO模型
Linux下,可以通过设置socket使其变为non-blocking。当对一个non-blocking socket执行读操作时,流程是这个样子:
从图中可以看出,当用户进程发出read操作时,如果kernel中的数据还没有准备好,那么它并不会block用户进程,而是立刻返回一个error。从用户进程角度讲 ,它发起一个read操作后,并不需要等待,而是马上就得到了一个结果。用户进程判断结果是一个error时,它就知道数据还没有准备好,于是用户就可以在本次到下次再发起read询问的时间间隔内做其他事情,或者直接再次发送read操作。一旦kernel中的数据准备好了,并且又再次收到了用户进程的system call,那么它马上就将数据拷贝到了用户内存(这一阶段仍然是阻塞的),然后返回。
也就是说非阻塞的recvform系统调用调用之后,进程并没有被阻塞,内核马上返回给进程,如果数据还没准备好,
此时会返回一个error。进程在返回之后,可以干点别的事情,然后再发起recvform系统调用。重复上面的过程,
循环往复的进行recvform系统调用。这个过程通常被称之为轮询。轮询检查内核数据,直到数据准备好,再拷贝数据到进程,
进行数据处理。需要注意,拷贝数据整个过程,进程仍然是属于阻塞的状态。
所以,在非阻塞式IO中,用户进程其实是需要不断的主动询问kernel数据准备好了没有。
非阻塞IO示例
#服务端
from socket import *
server = socket(AF_INET, SOCK_STREAM)
server.bind(('127.0.0.1',8099))
server.listen(5)
server.setblocking(False)
rlist=[]
wlist=[]
while True:
try:
conn, addr = server.accept()
rlist.append(conn)
print(rlist)
except BlockingIOError:
del_rlist=[]
# 收消息
for sock in rlist:
try:
data=sock.recv(1024)
if not data:
del_rlist.append(sock)
wlist.append((sock,data.upper()))
except BlockingIOError:
continue
except Exception:
sock.close()
del_rlist.append(sock)
del_wlist=[]
# 发消息
for item in wlist:
try:
sock = item[0]
data = item[1]
sock.send(data)
del_wlist.append(item)
except BlockingIOError:
pass
for item in del_wlist:
wlist.remove(item)
for sock in del_rlist:
rlist.remove(sock)
server.close()
#客户端
from socket import *
c=socket(AF_INET,SOCK_STREAM)
c.connect(('127.0.0.1',8080))
while True:
msg=input('>>: ')
if not msg:continue
c.send(msg.encode('utf-8'))
data=c.recv(1024)
print(data.decode('utf-8'))
但是非阻塞IO模型绝不被推荐。
我们不能否则其优点:能够在等待任务完成的时间里干其他活了(包括提交其他任务,也就是 “后台” 可以有多个任务在“”同时“”执行)。
但是也难掩其缺点:
1. 循环调用recv()将大幅度推高CPU占用率;这也是我们在代码中留一句time.sleep(2)的原因,否则在低配主机下极容易出现卡机情况
2. 任务完成的响应延迟增大了,因为每过一段时间才去轮询一次read操作,而任务可能在两次轮询之间的任意时间完成。
这会导致整体数据吞吐量的降低。
此外,在这个方案中recv()更多的是起到检测“操作是否完成”的作用,实际操作系统提供了更为高效的检测“操作是否完成“作用的接口,例如select()多路复用模式,可以一次检测多个连接是否活跃。
多路复用IO模型
IO multiplexing这个词可能有点陌生,但是如果我说select/epoll,大概就都能明白了。有些地方也称这种IO方式为事件驱动IO
(event driven IO)。我们都知道,select/epoll的好处就在于单个process就可以同时处理多个网络连接的IO。它的基本原理就是select/epoll这个function会不断的轮询所负责的所有socket,当某个socket有数据到达了,就通知用户进程。它的流程如图:
# select 是IO多路复用模型的一种
当用户进程调用了select,那么整个进程会被block,而同时,kernel会“监视”所有select负责的socket,
当任何一个socket中的数据准备好了,select就会返回。这个时候用户进程再调用read操作,将数据从kernel拷贝到用户进程。
这个图和blocking IO的图其实并没有太大的不同,事实上还更差一些。因为这里需要使用两个系统调用\(select和recvfrom\),
而blocking IO只调用了一个系统调用\(recvfrom\)。但是,用select的优势在于它可以同时处理多个connection。
强调:
1. 如果处理的连接数不是很高的话,使用select/epoll的web server不一定比使用multi-threading + blocking IO的web server性能更好,可能延迟还更大。select/epoll的优势并不是对于单个连接能处理得更快,而是在于能处理更多的连接。
2. 在多路复用模型中,对于每一个socket,一般都设置成为non-blocking,但是,如上图所示,整个用户的process其实是一直被block的。只不过process是被select这个函数block,而不是被socket IO给block。
结论: select的优势在于可以处理多个连接,不适用于单个连接
select网络IO模型示例
#服务端
from socket import *
import select
server = socket(AF_INET, SOCK_STREAM)
server.bind(('127.0.0.1',8093))
server.listen(5)
server.setblocking(False)
print('starting...')
rlist=[server,] # 收消息套接字 建连接和收消息
wlist=[] # 发消息套接字
wdata={}
while True:
rl,wl,xl=select.select(rlist,wlist,[],0.5)
print(wl)
for sock in rl:
if sock == server: # 干建连接的活
conn,addr=sock.accept()
rlist.append(conn)
else:
try:
data=sock.recv(1024)
if not data:
sock.close()
rlist.remove(sock)
continue
wlist.append(sock)
wdata[sock]=data.upper()
except Exception:
sock.close()
rlist.remove(sock)
for sock in wl:
sock.send(wdata[sock])
wlist.remove(sock)
wdata.pop(sock)
#客户端
from socket import *
c=socket(AF_INET,SOCK_STREAM)
c.connect(('127.0.0.1',8081))
while True:
msg=input('>>: ')
if not msg:continue
c.send(msg.encode('utf-8'))
data=c.recv(1024)
print(data.decode('utf-8'))
select监听fd变化的过程分析:
用户进程创建socket对象,拷贝监听的fd到内核空间,每一个fd会对应一张系统文件表,内核空间的fd响应到数据后,
就会发送信号给用户进程数据已到;
用户进程再发送系统调用,比如(accept)将内核空间的数据copy到用户空间,同时作为接受数据端内核空间的数据清除,
这样重新监听时fd再有新的数据又可以响应到了(发送端因为基于TCP协议所以需要收到应答后才会清除)。
该模型的优点:
相比其他模型,使用select() 的事件驱动模型只用单线程(进程)执行,占用资源少,不消耗太多 CPU,同时能够为多客户端提供服务。
如果试图建立一个简单的事件驱动的服务器程序,这个模型有一定的参考价值。
该模型的缺点:
首先select()接口并不是实现“事件驱动”的最好选择。因为当需要探测的句柄值较大时,select()接口本身需要消耗大量时间去轮询各个句柄。
很多操作系统提供了更为高效的接口,如linux提供了epoll,BSD提供了kqueue,Solaris提供了/dev/poll,…。
如果需要实现更高效的服务器程序,类似epoll这样的接口更被推荐。遗憾的是不同的操作系统特供的epoll接口有很大差异,
所以使用类似于epoll的接口实现具有较好跨平台能力的服务器会比较困难。
其次,该模型将事件探测和事件响应夹杂在一起,一旦事件响应的执行体庞大,则对整个模型是灾难性的
异步IO模型
Linux下的asynchronous IO其实用得不多,从内核2.6版本才开始引入。先看一下它的流程:
用户进程发起read操作之后,立刻就可以开始去做其它的事。而另一方面,从kernel的角度,当它受到一个asynchronous read之后,首先它会立刻返回,所以不会对用户进程产生任何block。然后,kernel会等待数据准备完成,然后将数据拷贝到用户内存,当这一切都完成之后,kernel会给用户进程发送一个signal,告诉它read操作完成了。
selectors模快
IO复用:为了解释这个名词,首先来理解下复用这个概念,复用也就是共用的意思,这样理解还是有些抽象,
为此,咱们来理解下复用在通信领域的使用,在通信领域中为了充分利用网络连接的物理介质,
往往在同一条网络链路上采用时分复用或频分复用的技术使其在同一链路上传输多路信号,到这里我们就基本上理解了复用的含义,
即公用某个“介质”来尽可能多的做同一类(性质)的事,那IO复用的“介质”是什么呢?为此我们首先来看看服务器编程的模型,
客户端发来的请求服务端会产生一个进程来对其进行服务,每当来一个客户请求就产生一个进程来服务,然而进程不可能无限制的产生,
因此为了解决大量客户端访问的问题,引入了IO复用技术,即:一个进程可以同时对多个客户请求进行服务。
也就是说IO复用的“介质”是进程(准确的说复用的是select和poll,因为进程也是靠调用select和poll来实现的),
复用一个进程(select和poll)来对多个IO进行服务,虽然客户端发来的IO是并发的但是IO所需的读写数据多数情况下是没有准备好的,
因此就可以利用一个函数(select和poll)来监听IO所需的这些数据的状态,一旦IO有数据可以进行读写了,进程就来对这样的IO进行服务。
理解完IO复用后,我们在来看下实现IO复用中的三个API(select、poll和epoll)的区别和联系
select,poll,epoll都是IO多路复用的机制,I/O多路复用就是通过一种机制,可以监视多个描述符,一旦某个描述符就绪(
一般是读就绪或者写就绪),能够通知应用程序进行相应的读写操作。但select,poll,epoll本质上都是同步I/O,
因为他们都需要在读写事件就绪后自己负责进行读写,也就是说这个读写过程是阻塞的,而异步I/O则无需自己负责进行读写,
异步I/O的实现会负责把数据从内核拷贝到用户空间。三者的原型如下所示:
int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);
int poll(struct pollfd *fds, nfds_t nfds, int timeout);
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
1.select的第一个参数nfds为fdset集合中最大描述符值加1,fdset是一个位数组,其大小限制为__FD_SETSIZE(1024),
位数组的每一位代表其对应的描述符是否需要被检查。第二三四参数表示需要关注读、写、错误事件的文件描述符位数组,
这些参数既是输入参数也是输出参数,可能会被内核修改用于标示哪些描述符上发生了关注的事件,
所以每次调用select前都需要重新初始化fdset。timeout参数为超时时间,该结构会被内核修改,其值为超时剩余的时间。
select的调用步骤如下:
(1)使用copy_from_user从用户空间拷贝fdset到内核空间
(2)注册回调函数__pollwait
(3)遍历所有fd,调用其对应的poll方法(对于socket,这个poll方法是sock_poll,sock_poll根据情况会调用到tcp_poll,udp_poll
或者datagram_poll)
(4)以tcp_poll为例,其核心实现就是__pollwait,也就是上面注册的回调函数。
(5)__pollwait的主要工作就是把current(当前进程)挂到设备的等待队列中,不同的设备有不同的等待队列,对于tcp_poll 来说,
其等待队列是sk->sk_sleep(注意把进程挂到等待队列中并不代表进程已经睡眠了)。在设备收到一条消息(网络设备)或填写完文件数据
(磁盘设备)后,会唤醒设备等待队列上睡眠的进程,这时current便被唤醒了。
(6)poll方法返回时会返回一个描述读写操作是否就绪的mask掩码,根据这个mask掩码给fd_set赋值。
(7)如果遍历完所有的fd,还没有返回一个可读写的mask掩码,则会调用schedule_timeout是调用select的进程(也就是 current)
进入睡眠。当设备驱动发生自身资源可读写后,会唤醒其等待队列上睡眠的进程。如果超过一定的超时时间(schedule_timeout 指定),
还是没人唤醒,则调用select的进程会重新被唤醒获得CPU,进而重新遍历fd,判断有没有就绪的fd。
(8)把fd_set从内核空间拷贝到用户空间。
总结下select的几大缺点:
(1)每次调用select,都需要把fd集合从用户态拷贝到内核态,这个开销在fd很多时会很大
(2)同时每次调用select都需要在内核遍历传递进来的所有fd,这个开销在fd很多时也很大
(3)select支持的文件描述符数量太小了,默认是1024
2. poll与select不同,通过一个pollfd数组向内核传递需要关注的事件,故没有描述符个数的限制,pollfd中的events字段和revents分别
用于标示关注的事件和发生的事件,故pollfd数组只需要被初始化一次。
poll的实现机制与select类似,其对应内核中的sys_poll,只不过poll向内核传递pollfd数组,然后对pollfd中的每个描述符进行poll,
相比处理fdset来说,poll效率更高。poll返回后,需要对pollfd中的每个元素检查其revents值,来得指事件是否发生。
3.直到Linux2.6才出现了由内核直接支持的实现方法,那就是epoll,被公认为Linux2.6下性能最好的多路I/O就绪通知方法。
epoll可以同时支持水平触发和边缘触发(Edge Triggered,只告诉进程哪些文件描述符刚刚变为就绪状态,它只说一遍,如果我们没有采取行动,
那么它将不会再次告知,这种方式称为边缘触发),理论上边缘触发的性能要更高一些,但是代码实现相当复杂。
epoll同样只告知那些就绪的文件描述符,而且当我们调用epoll_wait()获得就绪文件描述符时,返回的不是实际的描述符,
而是一个代表就绪描述符数量的值,你只需要去epoll指定的一个数组中依次取得相应数量的文件描述符即可,这里也使用了内存映射(mmap)技术,
这样便彻底省掉了这些文件描述符在系统调用时复制的开销。另一个本质的改进在于epoll采用基于事件的就绪通知方式。在select/poll中,
进程只有在调用一定的方法后,内核才对所有监视的文件描述符进行扫描,而epoll事先通过epoll_ctl()来注册一个文件描述符,
一旦基于某个文件描述符就绪时,内核会采用类似callback的回调机制,迅速激活这个文件描述符,当进程调用epoll_wait()时便得到通知。
epoll既然是对select和poll的改进,就应该能避免上述的三个缺点。那epoll都是怎么解决的呢?在此之前,我们先看一下epoll 和select和
poll的调用接口上的不同,select和poll都只提供了一个函数——select或者poll函数。而epoll提供了三个函 数,
epoll_create,epoll_ctl和epoll_wait,epoll_create是创建一个epoll句柄;epoll_ctl是注 册要监听的事件类型;
epoll_wait则是等待事件的产生。
对于第一个缺点,epoll的解决方案在epoll_ctl函数中。每次注册新的事件到epoll句柄中时(在epoll_ctl中指定 EPOLL_CTL_ADD),
会把所有的fd拷贝进内核,而不是在epoll_wait的时候重复拷贝。epoll保证了每个fd在整个过程中只会拷贝 一次。
对于第二个缺点,epoll的解决方案不像select或poll一样每次都把current轮流加入fd对应的设备等待队列中,而只在 epoll_ctl时把
current挂一遍(这一遍必不可少)并为每个fd指定一个回调函数,当设备就绪,唤醒等待队列上的等待者时,就会调用这个回调 函数,
而这个回调函数会把就绪的fd加入一个就绪链表)。epoll_wait的工作实际上就是在这个就绪链表中查看有没有就绪的fd
(利用 schedule_timeout()实现睡一会,判断一会的效果,和select实现中的第7步是类似的)。
对于第三个缺点,epoll没有这个限制,它所支持的FD上限是最大可以打开文件的数目,这个数字一般远大于2048,举个例子,
在1GB内存的机器上大约是10万左右,具体数目可以cat /proc/sys/fs/file-max察看,一般来说这个数目和系统内存关系很大。
总结:
(1)select,poll实现需要自己不断轮询所有fd集合,直到设备就绪,期间可能要睡眠和唤醒多次交替。而epoll其实也需要调用 epoll_wait
不断轮询就绪链表,期间也可能多次睡眠和唤醒交替,但是它是设备就绪时,调用回调函数,把就绪fd放入就绪链表中,并唤醒在 epoll_wait中
进入睡眠的进程。虽然都要睡眠和交替,但是select和poll在“醒着”的时候要遍历整个fd集合,而epoll在“醒着”的 时候只要判断一下就绪链表
是否为空就行了,这节省了大量的CPU时间,这就是回调机制带来的性能提升。
(2)select,poll每次调用都要把fd集合从用户态往内核态拷贝一次,并且要把current往设备等待队列中挂一次,而epoll只要 一次拷贝,
而且把current往等待队列上挂也只挂一次(在epoll_wait的开始,注意这里的等待队列并不是设备等待队列,只是一个epoll内
部定义的等待队列),这也能节省不少的开销。
这三种IO多路复用模型在不同的平台有着不同的支持,而epoll在windows下就不支持,好在我们有selectors模块,帮我们默认选择当前平台下最合适的
#服务端
from socket import *
import selectors
sel=selectors.DefaultSelector()
def accept(server_fileobj,mask):
conn,addr=server_fileobj.accept()
sel.register(conn,selectors.EVENT_READ,read)
def read(conn,mask):
try:
data=conn.recv(1024)
if not data:
print('closing',conn)
sel.unregister(conn)
conn.close()
return
conn.send(data.upper()+b'_SB')
except Exception:
print('closing', conn)
sel.unregister(conn)
conn.close()
server_fileobj=socket(AF_INET,SOCK_STREAM)
server_fileobj.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
server_fileobj.bind(('127.0.0.1',8088))
server_fileobj.listen(5)
server_fileobj.setblocking(False) #设置socket的接口为非阻塞
sel.register(server_fileobj,selectors.EVENT_READ,accept) #相当于网select的读列表里append了一个文件句柄
#server_fileobj,并且绑定了一个回调函数accept
while True:
events=sel.select() #检测所有的fileobj,是否有完成wait data的
for sel_obj,mask in events:
callback=sel_obj.data #callback=accpet
callback(sel_obj.fileobj,mask) #accpet(server_fileobj,1)
#客户端
from socket import *
c=socket(AF_INET,SOCK_STREAM)
c.connect(('127.0.0.1',8088))
while True:
msg=input('>>: ')
if not msg:continue
c.send(msg.encode('utf-8'))
data=c.recv(1024)
print(data.decode('utf-8'))