多进程和多线程的区别

  • Python多线程的操作,由于有GIL锁的存在,使得其运行效率并不会很高,无法充分利用 多核cpu 的优势,只有在I/O密集形的任务逻辑中才能实现并发。
  • 使用多进程来编写同样消耗cpu(一般是计算)的逻辑,对于 多核cpu 来说效率会好很多。
  • 操作系统对进程的调度代价要比线程调度要大的多。

多线程和多进程使用案例对比

1.用多进程多线程两种方式来运算 斐波那契数列,这里都依赖 concurrent.futures 模块提供的线/进程池。

  1. import time
  2. from concurrent.futures import ThreadPoolExecutor
  3. from concurrent.futures import ProcessPoolExecutor
  4. from concurrent.futures import as_completed
  5. def fib(n):
  6. return 1 if n <= 2 else fib(n-1) + fib(n-2)
  7. if __name__ == '__main__':
  8. # with ProcessPoolExecutor(3) as executor:
  9. with ThreadPoolExecutor(3) as executor:
  10. all_task = [executor.submit(fib, n) for n in range(25, 35)]
  11. start_time = time.time()
  12. for future in as_completed(all_task):
  13. data = future.result()
  14. # todo
  15. end_time = time.time()
  16. print("time consuming by threads: {0}s".format(end_time-start_time))
  17. # print("time consuming by processes: {0}s".format(end_time-start_time))

两种方式的运行结果对比:

  1. # result:
  2. # time consuming by threads: 4.823292016983032s
  3. # time consuming by processes: 3.3890748023986816s

可以看到,对于高计算量的任务,多进程要比多线程更加高效。同时,从这个例子中还能看出,通过concurrent.futures模块使用线程池进程池的方式的接口和使用逻辑是一样的,不过在使用多进程时,对于Windows的操作平台,相关逻辑一定要放在main中,Linux不受约束。

2.用多进程多线程两种方式来模拟 I/O密集操作,I/O操作 的特点就是 cpu 要耗费大量的时间进行等待数据,这里用sleep()进行模拟即可。

整体的操作方式不变,修改过的逻辑如下:

  1. def random_sleep(n):
  2. time.sleep(n)
  3. return n
  4. ...
  5. # 8 个线程,每个休眠两秒,模拟 I/O
  6. with ProcessPoolExecutor(8) as executor:
  7. # with ThreadPoolExecutor(8) as executor:
  8. all_task = [executor.submit(random_sleep, 2) for i in range(30)]
  1. # result:
  2. # time consuming by threads: 8.002903699874878s
  3. # time consuming by processes: 8.34946894645691s

多进程编程

直接使用

  1. import time
  2. import multiprocessing
  3. def read(times):
  4. time.sleep(times)
  5. print("process reading...")
  6. return "read for {0}s".format(times)
  7. def write(times):
  8. time.sleep(times)
  9. print("process writing...")
  10. return "write for {0}s".format(times)
  11. if __name__ == '__main__':
  12. read_process = multiprocessing.Process(target=read, args=(1,))
  13. write_process = multiprocessing.Process(target=write, args=(2,))
  14. read_process.start()
  15. write_process.start()
  16. print("read_process id {rid}".format(rid=read_process.pid))
  17. print("write_process id {wid}".format(wid=write_process.pid))
  18. read_process.join()
  19. write_process.join()
  20. print("done")
  21. # result:
  22. # read_process id 7064
  23. # write_process id 836
  24. # process reading...
  25. # process writing...
  26. # done

可以看出,关于多线程的逻辑和多线程的使用方式以类似的,要注意在Windows操作系统上,和进程有关的逻辑要写在if __name__ == '__main__'中。其他的一些方法请参阅 官方文档

使用原生进程池

  1. import time
  2. import multiprocessing
  3. def read(times):
  4. time.sleep(times)
  5. print("process reading...")
  6. return "read for {0}s".format(times)
  7. def write(times):
  8. time.sleep(times)
  9. print("process writing...")
  10. return "write for {0}s".format(times)
  11. if __name__ == '__main__':
  12. # multiprocessing.cpu_count() 获取cpu的核心数
  13. pool = multiprocessing.Pool(multiprocessing.cpu_count())
  14. read_result = pool.apply_async(read, args=(2,))
  15. write_result = pool.apply_async(write, args=(3,))
  16. # 关闭进程池,不再接受新的任务提交,否则 join() 出错
  17. pool.close()
  18. # 等待进程池中提交的所有任务完成
  19. pool.join()
  20. print(read_result.get())
  21. print(write_result.get())
  22. # result:
  23. # process reading...
  24. # process writing...
  25. # read for 2s
  26. # write for 3s

使用imap(),所有任务顺序执行:

  1. pool = multiprocessing.Pool(multiprocessing.cpu_count())
  2. for result in pool.imap(read, [2, 1, 3]):
  3. print(result)
  4. # result:
  5. # process reading...
  6. # process reading...
  7. # read for 2s
  8. # read for 1s
  9. # process reading...
  10. # read for 3s

使用imap_unordered(),哪个任务先完成就先返回结果:

  1. for result in pool.imap_unordered(read, [1, 5, 3]):
  2. print(result)
  3. # process reading...
  4. # read for 1s
  5. # process reading...
  6. # read for 3s
  7. # process reading...
  8. # read for 5s

使用concurrent.futures中的ProcessPoolExecutor

这个在多线程和多进程对比的时提到过,因为和多线程的使用方式一样,这里就不多赘述,可以参阅 官方文档 给出的例子

进程间通信

进程通信和线程通信有些区别,在线程通信中各种提供的锁的机制全局变量在这里不再适用,我们要选取新的工具来完成进程通信任务。

使用multiprocessing.Queue

使用逻辑是和多线程中的Queue是一样的,详细方法。这种通信方式不能用在通过Pool进程池创建的进程

  1. import multiprocessing
  2. import time
  3. def plus(queue):
  4. for i in range(6):
  5. num = queue.get() + 1
  6. queue.put(num)
  7. print(num)
  8. time.sleep(1)
  9. def subtract(queue):
  10. for i in range(6):
  11. num = queue.get() - 1
  12. queue.put(num)
  13. print(num)
  14. time.sleep(2)
  15. if __name__ == '__main__':
  16. queue = multiprocessing.Queue(1)
  17. queue.put(0)
  18. plus_process = multiprocessing.Process(target=plus, args=(queue,))
  19. subtract_process = multiprocessing.Process(target=subtract, args=(queue,))
  20. plus_process.start()
  21. subtract_process.start()
  22. # result:
  23. # 1
  24. # 1
  25. # 2
  26. # 2
  27. # 3
  28. # 3
  29. # 0
  30. # 1
  31. # 2
  32. # 2
  33. # 1
  34. # 0

使用Manager()中的Queue

Manager()会返回一个在进程间进行同步管理的一个对象,它提供了多种在进程间共享数据的形式。

  1. import multiprocessing
  2. import time
  3. def plus(queue):
  4. for i in range(6):
  5. num = queue.get() + 1
  6. queue.put(num)
  7. print(num)
  8. time.sleep(1)
  9. def subtract(queue):
  10. for i in range(6):
  11. num = queue.get() - 1
  12. queue.put(num)
  13. print(num)
  14. time.sleep(2)
  15. if __name__ == '__main__':
  16. queue = multiprocessing.Manager().Queue(1) # 创建方式有些奇特
  17. # queue = multiprocessing.Queue() # 这时用这个就行不通了
  18. pool = multiprocessing.Pool(2)
  19. queue.put(0)
  20. pool.apply_async(plus, args=(queue,))
  21. pool.apply_async(subtract, args=(queue,))
  22. pool.close()
  23. pool.join()
  24. # result:
  25. # 0
  26. # 1
  27. # 1
  28. # 2
  29. # 2
  30. # 3
  31. # -1
  32. # 0
  33. # 1
  34. # 2
  35. # 1
  36. # 0

使用Manager()中的list()

多个进程可以共享全局的list,因为是进程间共享,所以用锁的机制保证它的安全性。这里的Manager().Lock不是前面线程级别的Lock,它可以保证进程间的同步。

  1. import multiprocessing as mp
  2. import time
  3. def add_person(waiting_list, name_list, lock):
  4. lock.acquire()
  5. for name in name_list:
  6. waiting_list.append(name)
  7. time.sleep(1)
  8. print(waiting_list)
  9. lock.release()
  10. def get_person(waiting_list, lock):
  11. lock.acquire()
  12. if waiting_list:
  13. name = waiting_list.pop(0)
  14. print("get {0}".format(name))
  15. lock.release()
  16. if __name__ == '__main__':
  17. waiting_list = mp.Manager().list()
  18. lock = mp.Manager().Lock() # 使用 lock 限制进程对全局量的访问
  19. name_list = ["MetaTian", "Rity", "Anonymous"]
  20. add_process = mp.Process(target=add_person, args=(waiting_list, name_list, lock))
  21. get_process = mp.Process(target=get_person, args=(waiting_list, lock))
  22. add_process.start()
  23. get_process.start()
  24. add_process.join()
  25. get_process.join()
  26. print(waiting_list)
  27. # result:
  28. # ['MetaTian']
  29. # ['MetaTian', 'Rity']
  30. # ['MetaTian', 'Rity', 'Anonymous']
  31. # get MetaTian
  32. # ['Rity', 'Anonymous']

Manager()中还有更多的进程间通信的工具,可以参阅官方文档

使用Pipe

Pipe只能适用于两个进程间的通信,它的性能高于QueuePipe()会返回两个Connection对象,使用这个对象可以在进程间进行数据的发送和接收,非常像前面讲过的socket对象。关于Connection

  1. import multiprocessing
  2. def plus(conn):
  3. default_num = 0
  4. for i in range(3):
  5. num = 0 if i == 0 else conn.recv()
  6. conn.send(num + 1)
  7. print("plus send: {0}".format(num+1))
  8. def subtract(conn):
  9. for i in range(3):
  10. num = conn.recv()
  11. conn.send(num-1)
  12. print("subtract send: {0}".format(num-1))
  13. if __name__ == '__main__':
  14. conn_plus, conn_sbtract = multiprocessing.Pipe()
  15. plus_process = multiprocessing.Process(target=plus, args=(conn_plus,))
  16. subtract_process = multiprocessing.Process(target=subtract, args=(conn_sbtract,))
  17. plus_process.start()
  18. subtract_process.start()
  19. # result:
  20. # plus send: 1
  21. # subtract send: 0
  22. # plus send: 1
  23. # subtract send: 0
  24. # plus send: 1
  25. # subtract send: 0

send()可以连续发送数据,recv()将另一端发送的数据陆续取出,如果没有取到数据,则进入等待状态。