进程和线程的关系

一个线程只属于一个进程,而一个进程可以有多个线程,至少有一个线程。
资源分配给进程,同一个进程的线程共享该进程的所有资源。
CPU分配给线程,即真正在CPU上运行的是线程。

并行处理

是计算机中能同时执行两个或者多个处理的计算方式。并行处理可以同时工作于同一程序的不同方面。并行处理是解决大型和复杂问题滴解决时间

并发处理

一个时间段中有几个程序都处于已启动运行到运行结束的时间,且几个程序都在同一个CPU上运行。
并发的关键是你有处理多个任务的能力,但不一定要同时。并行的关键是你要拥有同时处理几个任务的能力;所有并行是并发的子集

通过调用multiprocessing.Process实现多进程

  • Python提供了非常好用的多进程包multiprocessing,借助这个包,可以轻松完成从单进程到并发执行的转换。
  • multiprocessing模块提供了一个Process类来创建一个进程对象。
  • 如果用循环创建多个进程时,可以把多个子进程对象加到list中,在循环外再循环执行list中的每个对象的join()方法以达到主进程等待所有子进程都结束。
  1. import timefrom multiprocessing import Process
  2. def sing(name):
  3. for i in range(10):
  4. print(name+"唱歌")
  5. time.sleep(1)
  6. def dance():
  7. for i in range(10):
  8. print("跳舞")
  9. time.sleep(1)
  10. if __name__ == '__main__':
  11. p1 = Process(target=sing, args=('林志凌',), name="sing")
  12. p2 = Process(target=dance , name="dance")
  13. p1.start()
  14. print(p1.name)
  15. p2.start()
  16. print(p2.name)
  17. p1.join() #join() 主进程等待子进程结束
  18. p2.join()

Process 类常用方法

  • p.start():启动进程,并调用该子进程中的p.run()
  • p.run():进程启动时运行的方法,正是它去调用target指定的函数,我们自定义类的类中一定要实现该方法
  • p.terminate() 强制终止进程p,不会进行任何清理操作
  • p.is_alive() 如果p仍然运行,返回True,用于判断进程是否在运行
  • p.join([timeout]) 主进程等待子进程结束,timeout是可选的超时时间。
  • Process类常用属性
  • name:当前进程实例别名,默认为Process-N,N从1开始递增
  • pid:当前进程实例的PID值

通过继承Process类创建多进程

  1. import multiprocessing
  2. import time
  3. class ClockProcess(multiprocessing.Process):
  4. def __init__(arg):
  5. super().__init__()
  6. self.arg = arg
  7. def run(self):
  8. n = self.arg
  9. while n>0:
  10. print(n)
  11. time.sleep(1)
  12. n -= 1
  13. if __name__ == '__main__':
  14. p = ClockProcess(5)
  15. p.start()
  16. p.join()

进程池-Pool

  • 进程池:用来创建多个进程
  • 当需要创建的子进程数量不多时,可以直接利用multiprocessing中的Process动态生成多个进程,但如果要创建的子进程数很多时,此时可以用到multiprocessing模块提供的Pool
  • 初始化Pool时,可以指定一个最大进程数, 当有新的请求提交到Pool中时,如果进程池还没有满,那么就会创建一个新的进程用来执行该请求;但如果进程池中的进程数已经达到最大值,那么该请求就会等待,直到进程池中有进程结束,才会创建新的进程来执行。
  1. from multiprocessing import Pool
  2. import random, time
  3. def work(num):
  4. print(random.random() * num)
  5. time.sleep(3)
  6. if __name__ == '__main__':
  7. po = Pool(4) #如果不写默认为当前CPU核数
  8. for i in range(10):
  9. po.apply_async(work, (i,))
  10. po.close()
  11. po.join()
  • multiprocessing.Pool常用函数解析:
  • apply_async(self, func, args=(), kwds={}, callback=None, error_callback=None):使用非阻塞方式调用fun(并行执行,阻塞方式必须等上一个进程退出才能执行下一个进程)args为传递给func的参数列表,kwds为传递给func的关键字参数列表;
  • apply(func[, args[, kwds]]) (了解即可几乎不用)使用阻塞的方式调用func
  • close():关闭Pool,使其不再接受新的任务;
  • terminate():不管任务是否完成,立即终止;
  • join():主进程阻塞,等待子进程退出,必须在close()或terminate之后使用;

五、 进程间通信-Queue

  • 可以使用multiprocessing模块的Queue实现多进程间的数据传递
  • 初始化Queue()对象时(例如:q = Queue()),若括号中没有指定最大可接收的消息数量,或数量为负值,那么就代表可接受的消息数量没有上限
  • Queue.qsize():返回当前队列包含的消息数量
  • Queue.empty():如果队列为空,返回True,反之False
  • Queue.full():如果队列满了,返回True,反之False
  • Queue.get([block[, timeout]]):获取队列中的一条消息,然后将其从队列中移除,block默认值为True
    • 如果block使用默认值,且没有设置timeout(单位移),消息队列如果为空,此时程序将被阻塞(停在读取状态),直到从消息队列读到消息为止,如果设置了timeout,则会等待timeout秒,若还没读取到任何消息,则抛出“Queue.Empty”异常
    • 如果block值为False,消息队列如果为空,则会立刻抛出“Queue.Empty”异常。
  • Queue.get_nowait() :相当于Queue.get(False)
  • Queue.put(item, [block, [, timeout]]) :将item消息写入队列,block默认值为True
    • 如果block使用默认值,且没有设置timeout,消息队列如果已没有空间写入,此时程序将被阻塞(停在写入状态),直到从消息队列中腾出空间为止,如果设置了True和timeout,则会等待timeout秒,若还没有空间,则抛出”Queue.Full“异常
    • 如果block值为False,消息队列如果没有空间可写入,则会立刻抛出”Queue.Full“异常
  • Queue.put_nowait(itme) :相当于Queue.put(item, False)
  1. import time
  2. from multiprocessing import Queue, Process
  3. def write(q)
  4. for value in ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h']:
  5. print("开始写入:", value)
  6. q.put(value)
  7. time.sleep(1)
  8. def read(q):
  9. while True:
  10. if not q.empty():
  11. print("读取到的是", q.get())
  12. time.sleep(1)
  13. else:
  14. break
  15. if __name__ == '__main__':
  16. q = Queue()
  17. pw = Process(target=write, args=(q,))
  18. pr = Process(target=read, args=(q,))
  19. pw.start()
  20. time.sleep(1)
  21. pr.start()
  22. pw.join()
  23. pr.join()
  24. print('接收完毕!')

进程池之间通信- Manager().Queue()

  • 进程池之间通信,就需要使用multiprocessing.Manager()中的Queue()而不是multiprocessing.Queue()
  • 否则会得到一条如下错误信息: ```python import time from multiprocessing import Manager, Pool

def writer(q): for i in “welcome”: print(“开始写入”, i) q.put(i)

def reader(q): time.sleep(3) for i in range(q.qsize()): print(“得到消息”, q.get())

if name == “main“: print(“主进程启动”) q = Manager().Queue() po = Pool() po.apply_async(writer, (q,)) po.apply_async(reader, (q,)) po.close() po.join() ```