threading 模块
import threadingdef fun1():print('fun1')time.sleep(2)print('fun1 end')def fun2(n):print('fun2',n)time.sleep(4)print('fun2 end')t1 = threading.Thread(target=fun1)t2 = threading.Thread(target=fun2, args=(1,))# 设置守护线程t1.setDaemon(True) # 主线程结束后(主线程要等非守护线程执行结束后才会结束),t1也结束t1.start()t2.start()t1.join() # 只能等t1线程结束后才继续执行主线程print('zhu') # 等t1结束后打印
class MyThread(threading.Thread):def __init__(self):super().__init__()def run(self):passt1 = MyThread()t2 = MyThread()t1.start() # 执行run方法t2.start()
其他方法
Thread实例对象的方法
isAlive():返回线程是否活动getName():返回线程名setName():设置线程名
threading模块提供的方法
threading.currentThread():返回当前的线程变量threading.enumerate():返回一个包含正在运行的线程的list,正在运行指线程启动后、结束前,不包括启动前和结束后的线程threading.activeCount():返回正在运行的线程数量,与len(threading.enumerate())有相同结果
GIL(全局解释器锁)
Python的多线程:由于GIL, 导致同一时刻,同一进程只能有一个线程被运行
锁
互斥锁
对公共数据保护
只能有一个线程执行
lock = threading.Lock() # 互斥锁lock.acquire() # 加锁之间的代码只能等一个线程执行完其他线程才能执行lock.release() # 解锁
递归锁
解决死锁问题
rlock = threading.RLock()# 锁中有个计数器 acquire 则加1 release则减1 ,计数器大于0,则无法拿到锁rlock.acquire() # +1...rlock.acquire() # +1...rlock.release() # -1...rlock.release() # -1
Event对象
event = threading.Event() # flag = Falseevent.isSet() # 返回event的状态值event.wait() #如果 event.isSet()==False将阻塞线程;event.set() # 设置event的状态值为True,所有阻塞池的线程激活进入就绪状态, 等待操作系统调度;event.clear() #恢复event的状态值为False。
Queue 队列
先进先出
优点:线程安全
import queueq = queue.Queue(4) # 可put4个值q.put(111)q.put(222)q.get()q.get()q.get() # 没有数据卡主 q.get(False) 没有数据报错
join task_done
join()阻塞进程,直到所有任务完成,需要配合task_done
其他常用方法
q.qsize() # 返回队列的大小q.empty() # 如果队列为空 Trueq.full() # 队列是否满q.get_nowait() # 相对于q.get(False)
其他模式
先进后出
q = queue.LifoQueue() # last in firest out
优先级
q = queue.PriorityQueue()q.put([2, 'data1'])q.put([4, 'data2'])q.put([1, 'data3'])q.get() # data3q.get() # data1q.get() # data2
生产者消费者模型
multiprocessing模块
进程模块,并行运行
与theading同api
协程
- 协程的好处:
- 无需线程上下文切换的开销
- 无需原子操作锁定及同步的开销
- 方便切换控制流,简化编程模型
- 高并发+高扩展性+低成本:一个CPU支持上万的协程都不是问题。所以很适合用于高并发处理。
- 缺点:
- 无法利用多核资源:协程的本质是个单线程,它不能同时将 单个CPU 的多个核用上,协程需要和进程配合才能运行在多CPU上.当然我们日常所编写的绝大部分应用都没有这个必要,除非是cpu密集型应用。
进行阻塞(Blocking)操作(如IO时)会阻塞掉整个程序
- 无法利用多核资源:协程的本质是个单线程,它不能同时将 单个CPU 的多个核用上,协程需要和进程配合才能运行在多CPU上.当然我们日常所编写的绝大部分应用都没有这个必要,除非是cpu密集型应用。
greenlet
生成器函数或者协程函数中的yield语句挂起函数的执行,直到稍后使用next()或send()操作进行恢复为止。可以使用一个调度器循环在一组生成器函数之间协作多个任务。greentlet是python中实现我们所谓的”Coroutine(协程)”的一个基础库.
from greenlet import greenletdef test1():print (12)gr2.switch()print (34)gr2.switch()def test2():print (56)gr1.switch()print (78)gr1 = greenlet(test1)gr2 = greenlet(test2)gr1.switch()
gevent
基于greenlet的框架
当一个greenlet遇到IO操作时,比如访问网络,就自动切换到其他的greenlet,等到IO操作完成,再在适当的时候切换回来继续执行。由于IO操作非常耗时,经常使程序处于等待状态,有了gevent为我们自动切换协程,就保证总有greenlet在运行,而不是等待IO
import geventfrom gevent import monkeyfrom urllib import requestimport timedef f(url):print('GET: %s' % url)resp = request.urlopen(url)data = resp.read()print('%d bytes received from %s.' % (len(data), url))start=time.time()monkey.patch_all()gevent.joinall([gevent.spawn(f, 'https://itk.org/'),gevent.spawn(f, 'https://www.github.com/'),gevent.spawn(f, 'https://zhihu.com/'),])# f('https://itk.org/')# f('https://www.github.com/')# f('https://zhihu.com/')print(time.time()-start)
eventlet
基于greenlet的框架
IO模型
- 阻塞IO
- 全程阻塞
- 非阻塞IO
- 发送多次系统调用;
- 优点:wait for data时无阻塞
- 缺点:1.系统调用太多 2.数据不是事实接收的
- wait for data 非阻塞;copy data 阻塞
- IO多路复用(监听多个连接)
- 全程阻塞
- 能监听多个文件描述符,实现并发
- 异步IO
- 全程无阻塞
- 驱动信号
IO多路复用
select模块
缺点:
- 每次调用select都要将所有的fd(文件描述符)拷贝到内核控件,导致效率下降
- 遍历所有的fd,是否有数据访问
- 最大连接数(1024),超过不再监听
import socketimport selectservice = socket.socket()service.bind(('127.0.0.1', 8080))service.listen(5)service.setblocking(False)inputs = [service]while True:r, w, e = select.select(inputs, [], [], 5)for obj in r:if obj == service:conn, addr = obj.accept()print(conn)inputs.append(conn)else:try:data = obj.recv(1024)if not data:inputs.remove(obj)continueprint(data.decode('utf-8'))recv_data = input('>>>') obj.send(recv_data.encode('utf-8'))except ConnectionError:inputs.remove(obj)
poll、epoll
windows下没有只有在linux中
poll 和 select 唯一区别,poll没有连接数限制
epoll:
- 第一个函数:创建epoll句柄:将所有的fd拷贝到内核控件,但是只需拷贝一次
- 回调函数:为所有的fd绑定一个回调函数,一旦有数据访问,触发该回调函数,回调函数将fd放到链表中
- 第三个函数:判断链表是否为空
selectors
会根据平台自动选择最佳IO多路复用机制,Linux中选择epoll
import socketimport selectorsdef accept(sock, mask):conn, addr = sock.accept()# 将conn进行注册sel.register(conn, selectors.EVENT_READ, interact)def interact(conn, mask):try:recv_data = conn.recv(1024)if not recv_data:sel.unregister(conn)returnprint(recv_data.decode('utf-8'))send_data = input('>>>')conn.send(send_data.encode('utf-8'))except ConnectionError:sel.unregister(conn)service = socket.socket()service.bind(('127.0.0.1', 8000))service.listen(5)service.setblocking(False)sel = selectors.DefaultSelector() # 自动选择IO多路复用机制 linux :epollsel.register(service, selectors.EVENT_READ, accept) # 注册servicewhile True:events = sel.select() # 监听for key, mask in events:func = key.dataobj = key.fileobjfunc(obj, mask) # accept(service, mask) interact(conn, mask)
