threading 模块

  1. import threading
  2. def fun1():
  3. print('fun1')
  4. time.sleep(2)
  5. print('fun1 end')
  6. def fun2(n):
  7. print('fun2',n)
  8. time.sleep(4)
  9. print('fun2 end')
  10. t1 = threading.Thread(target=fun1)
  11. t2 = threading.Thread(target=fun2, args=(1,))
  12. # 设置守护线程
  13. t1.setDaemon(True) # 主线程结束后(主线程要等非守护线程执行结束后才会结束),t1也结束
  14. t1.start()
  15. t2.start()
  16. t1.join() # 只能等t1线程结束后才继续执行主线程
  17. print('zhu') # 等t1结束后打印
  1. class MyThread(threading.Thread):
  2. def __init__(self):
  3. super().__init__()
  4. def run(self):
  5. pass
  6. t1 = MyThread()
  7. t2 = MyThread()
  8. t1.start() # 执行run方法
  9. t2.start()

其他方法

Thread实例对象的方法

  1. isAlive():返回线程是否活动
  2. getName():返回线程名
  3. setName():设置线程名

threading模块提供的方法

  1. threading.currentThread():返回当前的线程变量
  2. threading.enumerate():返回一个包含正在运行的线程的list,正在运行指线程启动后、结束前,不包括启动前和结束后的线程
  3. threading.activeCount():返回正在运行的线程数量,与len(threading.enumerate())有相同结果

GIL(全局解释器锁)

Python的多线程:由于GIL, 导致同一时刻,同一进程只能有一个线程被运行

互斥锁

对公共数据保护
只能有一个线程执行

  1. lock = threading.Lock() # 互斥锁
  2. lock.acquire() # 加锁
  3. 之间的代码只能等一个线程执行完其他线程才能执行
  4. lock.release() # 解锁

递归锁

解决死锁问题

  1. rlock = threading.RLock()
  2. # 锁中有个计数器 acquire 则加1 release则减1 ,计数器大于0,则无法拿到锁
  3. rlock.acquire() # +1
  4. ...
  5. rlock.acquire() # +1
  6. ...
  7. rlock.release() # -1
  8. ...
  9. rlock.release() # -1

Event对象

  1. event = threading.Event() # flag = False
  2. event.isSet() # 返回event的状态值
  3. event.wait() #如果 event.isSet()==False将阻塞线程;
  4. event.set() # 设置event的状态值为True,所有阻塞池的线程激活进入就绪状态, 等待操作系统调度;
  5. event.clear() #恢复event的状态值为False。

Queue 队列

先进先出
优点:线程安全

  1. import queue
  2. q = queue.Queue(4) # 可put4个值
  3. q.put(111)
  4. q.put(222)
  5. q.get()
  6. q.get()
  7. q.get() # 没有数据卡主 q.get(False) 没有数据报错

join task_done

join()阻塞进程,直到所有任务完成,需要配合task_done

其他常用方法

  1. q.qsize() # 返回队列的大小
  2. q.empty() # 如果队列为空 True
  3. q.full() # 队列是否满
  4. q.get_nowait() # 相对于q.get(False)

其他模式

先进后出

  1. q = queue.LifoQueue() # last in firest out

优先级

  1. q = queue.PriorityQueue()
  2. q.put([2, 'data1'])
  3. q.put([4, 'data2'])
  4. q.put([1, 'data3'])
  5. q.get() # data3
  6. q.get() # data1
  7. q.get() # data2

生产者消费者模型

multiprocessing模块

进程模块,并行运行
与theading同api

协程

  • 协程的好处:
    • 无需线程上下文切换的开销
    • 无需原子操作锁定及同步的开销
    • 方便切换控制流,简化编程模型
    • 高并发+高扩展性+低成本:一个CPU支持上万的协程都不是问题。所以很适合用于高并发处理。
  • 缺点:
    • 无法利用多核资源:协程的本质是个单线程,它不能同时将 单个CPU 的多个核用上,协程需要和进程配合才能运行在多CPU上.当然我们日常所编写的绝大部分应用都没有这个必要,除非是cpu密集型应用。
      进行阻塞(Blocking)操作(如IO时)会阻塞掉整个程序

greenlet

生成器函数或者协程函数中的yield语句挂起函数的执行,直到稍后使用next()或send()操作进行恢复为止。可以使用一个调度器循环在一组生成器函数之间协作多个任务。greentlet是python中实现我们所谓的”Coroutine(协程)”的一个基础库.

  1. from greenlet import greenlet
  2. def test1():
  3. print (12)
  4. gr2.switch()
  5. print (34)
  6. gr2.switch()
  7. def test2():
  8. print (56)
  9. gr1.switch()
  10. print (78)
  11. gr1 = greenlet(test1)
  12. gr2 = greenlet(test2)
  13. gr1.switch()

gevent

基于greenlet的框架
当一个greenlet遇到IO操作时,比如访问网络,就自动切换到其他的greenlet,等到IO操作完成,再在适当的时候切换回来继续执行。由于IO操作非常耗时,经常使程序处于等待状态,有了gevent为我们自动切换协程,就保证总有greenlet在运行,而不是等待IO

  1. import gevent
  2. from gevent import monkey
  3. from urllib import request
  4. import time
  5. def f(url):
  6. print('GET: %s' % url)
  7. resp = request.urlopen(url)
  8. data = resp.read()
  9. print('%d bytes received from %s.' % (len(data), url))
  10. start=time.time()
  11. monkey.patch_all()
  12. gevent.joinall([
  13. gevent.spawn(f, 'https://itk.org/'),
  14. gevent.spawn(f, 'https://www.github.com/'),
  15. gevent.spawn(f, 'https://zhihu.com/'),
  16. ])
  17. # f('https://itk.org/')
  18. # f('https://www.github.com/')
  19. # f('https://zhihu.com/')
  20. print(time.time()-start)

eventlet

基于greenlet的框架

IO模型

  1. 阻塞IO
    • 全程阻塞
  2. 非阻塞IO
    • 发送多次系统调用;
    • 优点:wait for data时无阻塞
    • 缺点:1.系统调用太多 2.数据不是事实接收的
    • wait for data 非阻塞;copy data 阻塞
  3. IO多路复用(监听多个连接)
    • 全程阻塞
    • 能监听多个文件描述符,实现并发
  4. 异步IO
    • 全程无阻塞
  5. 驱动信号

IO多路复用

select模块

缺点:

  • 每次调用select都要将所有的fd(文件描述符)拷贝到内核控件,导致效率下降
  • 遍历所有的fd,是否有数据访问
  • 最大连接数(1024),超过不再监听
  1. import socket
  2. import select
  3. service = socket.socket()
  4. service.bind(('127.0.0.1', 8080))
  5. service.listen(5)
  6. service.setblocking(False)
  7. inputs = [service]
  8. while True:
  9. r, w, e = select.select(inputs, [], [], 5)
  10. for obj in r:
  11. if obj == service:
  12. conn, addr = obj.accept()
  13. print(conn)
  14. inputs.append(conn)
  15. else:
  16. try:
  17. data = obj.recv(1024)
  18. if not data:
  19. inputs.remove(obj)
  20. continue
  21. print(data.decode('utf-8'))
  22. recv_data = input('>>>') obj.send(recv_data.encode('utf-8'))
  23. except ConnectionError:
  24. inputs.remove(obj)

poll、epoll

windows下没有只有在linux中
poll 和 select 唯一区别,poll没有连接数限制

epoll:

  • 第一个函数:创建epoll句柄:将所有的fd拷贝到内核控件,但是只需拷贝一次
  • 回调函数:为所有的fd绑定一个回调函数,一旦有数据访问,触发该回调函数,回调函数将fd放到链表中
  • 第三个函数:判断链表是否为空

selectors

会根据平台自动选择最佳IO多路复用机制,Linux中选择epoll

  1. import socket
  2. import selectors
  3. def accept(sock, mask):
  4. conn, addr = sock.accept()
  5. # 将conn进行注册
  6. sel.register(conn, selectors.EVENT_READ, interact)
  7. def interact(conn, mask):
  8. try:
  9. recv_data = conn.recv(1024)
  10. if not recv_data:
  11. sel.unregister(conn)
  12. return
  13. print(recv_data.decode('utf-8'))
  14. send_data = input('>>>')
  15. conn.send(send_data.encode('utf-8'))
  16. except ConnectionError:
  17. sel.unregister(conn)
  18. service = socket.socket()
  19. service.bind(('127.0.0.1', 8000))
  20. service.listen(5)
  21. service.setblocking(False)
  22. sel = selectors.DefaultSelector() # 自动选择IO多路复用机制 linux :epoll
  23. sel.register(service, selectors.EVENT_READ, accept) # 注册service
  24. while True:
  25. events = sel.select() # 监听
  26. for key, mask in events:
  27. func = key.data
  28. obj = key.fileobj
  29. func(obj, mask) # accept(service, mask) interact(conn, mask)