进程:一个进程就是一个正在运行的程序,进程是操作系统进行资源分配的基本单位,每个进程都有自己独立的内存空间。同一时刻只能执行一个进程,所谓多进程是指CPU通过在进程间来回切换达到宏观上的多个进程共同执行的效果。

Unix/Linux操作系统提供了一个fork()系统调用,它非常特殊。普通的函数调用,调用一次,返回一次,但是fork()调用一次,返回两次,因为操作系统自动把当前进程(称为父进程)复制了一份(称为子进程),然后,分别在父进程和子进程内返回。子进程永远返回0,而父进程返回子进程的ID。这样做的理由是,一个父进程可以fork出很多子进程,所以,父进程要记下每个子进程的ID,而子进程只需要调用getppid()就可以拿到父进程的ID。

multiprocessing库

python3中对于进程的各种操作方法都在multiprocessing库中实现

multiprocessing库提供了一个Process类来代表一个进程对象。下面例子演示了启动一个子进程并等待其结束:

  1. from multiprocessing import Process
  2. import os
  3. #子进程要执行的代码
  4. def run_proc(name):
  5. print('Run child process %s(%s)'%(name,os.getppid()))
  6. if __name__=='__main__':
  7. print('Parent process %s '.os.getppid())
  8. p = Process(target = run_proc , args = ('test',))
  9. print('Child process will start')
  10. p.start()
  11. p.join()
  12. print('Child procss end.')

执行结果如下:

Parent process 8876.
Child process will start.
Run child process test (7572)...
Child process end.

根据上面的例子,要创建子进程,只需要在Process()方法target参数传入一个执行函数的函数名(如run_proc),然后在args或者kwargs参数传入前面函数需要的各项参数,通过创建一个Process实例,用start()方法启动。join()方法等待子进程运行结束后再继续往下执行,通常用于进程的同步。

进程池

如果要启动大量的子进程,可以使用进程池的方式批量创建子进程。

用Process库的Pool对象来建立一个进程池,进程池中进程的最大数量根据输入的数字确定。对Pool对象调用join()方法会使程序等待所有子进程执行完毕后,再往后执行。调用join()方法之前必须先调用close()方法停止向进程池中放入新的进程。

from multiprocessing import Pool
import os, time, random

def long_time_task(name):
    print('Run task %s (%s)...' % (name, os.getpid()))
    start = time.time()
    time.sleep(random.random() * 3)
    end = time.time()
    print('Task %s runs %0.2f seconds.' % (name, (end - start)))

if __name__=='__main__':
    print('Parent process %s.' % os.getpid())
    p = Pool(4)
    for i in range(5):
        p.apply_async(long_time_task,args=(i,))
    print('Waiting for all subprocesses done...')
    p.close()
    p.join()
    print('All subprocesses done.')

执行结果:

Parent process 3620.
Waiting for all subprocesses done...
Run task 0 (7952)...
Run task 1 (5212)...
Run task 2 (9788)...
Run task 3 (6080)...
Task 1 runs 0.80 seconds.
Run task 4 (5212)...
Task 0 runs 1.31 seconds.
Task 2 runs 1.50 seconds.
Task 4 runs 0.73 seconds.
Task 3 runs 2.29 seconds.
All subprocesses done.

输出的结果task0 ,1 ,2,3是立刻执行的。而task4要等待前面某个task完成后才继续执行,这是因为Pool的默认大小被设置为4,因此最多同时执行4个进程。

Process库中Pool类中的方法:

  • apply(func[, args[, kwds]]) :使用arg和kwds参数调用func函数,结果返回前会一直阻塞,由于这个原因,apply_async()更适合并发执行,另外,func函数仅被pool中的一个进程运行。

  • apply_async(func[, args[, kwds[, callback[, error_callback]]]]) : apply()方法的一个变体,会返回一个结果对象。如果callback被指定,那么callback可以接收一个参数然后被调用,当结果准备好回调时会调用callback,调用失败时,则用error_callback替换callback。 Callbacks应被立即完成,否则处理结果的线程会被阻塞。

  • close() : 阻止更多的任务提交到pool,待任务完成后,工作进程会退出。

  • terminate() : 不管任务是否完成,立即停止工作进程。在对pool对象进程垃圾回收的时候,会立即调用terminate()。

  • join() : wait工作线程的退出,在调用join()前,必须调用close()或者terminate()。这样是因为被终止的进程需要被父进程调用wait(join等价于wait),否则进程会成为僵尸进程。

  • map(func, iterable[, chunksize])

  • map_async(func, iterable[, chunksize[, callback[, error_callback]]])

  • imap(func, iterable[, chunksize])

  • imap_unordered(func, iterable[, chunksize])

  • starmap(func, iterable[, chunksize])

  • starmap_async(func, iterable[, chunksize[, callback[, error_back]]])

子进程

很多时候,子进程并不是自身,而是一个外部进程。我们创建了子进程后,还需要控制子进程的输入和输出。

subprocess模块可以让我们非常方便地启动一个子进程,然后控制其输入和输出。

下面的例子演示了如何在Python代码中运行命令nslookup www.python.org,这和命令行直接运行的效果是一样的:


import subprocess

print('$ nslookup www.python.org')
r = subprocess.call(['nslookup', 'www.python.org'])
print('Exit code:', r)

运行结果:

$ nslookup www.python.org
Server:        192.168.19.4
Address:    192.168.19.4#53

Non-authoritative answer:
www.python.org    canonical name = python.map.fastly.net.
Name:    python.map.fastly.net
Address: 199.27.79.223

Exit code: 0

如果子进程还需要输入,则可以通过communicate()方法输入:

import subprocess

print('$ nslookup')
p = subprocess.Popen(['nslookup'], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
output, err = p.communicate(b'set q=mx\npython.org\nexit\n')
print(output.decode('utf-8'))
print('Exit code:', p.returncode)

上面的代码相当于在命令行执行命令nslookup,然后手动输入:

set q=mx
python.org
exit

运行结果如下:

$ nslookup
Server:        192.168.19.4
Address:    192.168.19.4#53

Non-authoritative answer:
python.org    mail exchanger = 50 mail.python.org.

Authoritative answers can be found from:
mail.python.org    internet address = 82.94.164.166
mail.python.org    has AAAA address 2001:888:2000:d::a6


Exit code: 0

进程间的通信(生产者-消费者模型)

Python的multiprocessing模块包装了底层的机制,提供了Queue、Pipes等多种方式来交换数据。

我们以Queue为例,在父进程中创建两个子进程,一个往Queue里写数据,一个从Queue里读数据:

from multiprocessing import Process, Queue
import os, time, random

# 写数据进程执行的代码:
def write(q):
    print('Process to write: %s' % os.getpid())
    for value in ['A', 'B', 'C']:
        print('Put %s to queue...' % value)
        q.put(value)
        time.sleep(random.random())

# 读数据进程执行的代码:
def read(q):
    print('Process to read: %s' % os.getpid())
    while True:
        value = q.get(True)
        print('Get %s from queue.' % value)

if __name__=='__main__':
    # 父进程创建Queue,并传给各个子进程:
    q = Queue()
    pw = Process(target=write, args=(q,))
    pr = Process(target=read, args=(q,))
    # 启动子进程pw,写入:
    pw.start()
    # 启动子进程pr,读取:
    pr.start()
    # 等待pw结束:
    pw.join()
    # pr进程里是死循环,无法等待其结束,只能强行终止:
    pr.terminate()

运行结果:

Process to read: 7744
Process to write: 6152
Put A to queue...
Get A from queue.
Put B to queue...
Get B from queue.
Put C to queue...
Get C from queue.

Queue 就是对队列,它是线程安全的.

Queue模块:
import queue

q = queue.Queue(maxsize=0) # 构造一个先进显出队列,maxsize指定队列长度,为0 时,表示队列长度无限制。

  • q.join():等到队列为空的时候,往后执行
  • q.qsize():返回队列的大小 (不可靠)
  • q.empty():当队列为空的时候,返回True 否则返回False (不可靠)
  • q.full() :当队列满的时候,返回True,否则返回False (不可靠)
  • q.put(item, block=True, timeout=None) : 将item放入Queue尾部,item必须存在,可以参数block默认为True,表示当队列满时,会等待队列给出可用位置,为False时为非阻塞,此时如果队列已满,会引发queue.Full 异常。可选参数timeout,表示会阻塞设置的时间,过后,如果队列无法给出放入item的位置,则引发 queue.Full 异常
  • q.get(block=True, timeout=None) : 移除并返回队列头部的一个值,可选参数block默认为True,表示获取值的时候,如果队列为空,则阻塞,为False时,不阻塞,若此时队列为空,则引发 queue.Empty异常。 可选参数timeout,表示会阻塞设置的时候,过后,如果队列为空,则引发Empty异常。
  • q.put_nowait(item): 等效于 put(item,block=False)
  • q.get_nowait() : 等效于 get(item,block=False)

在Unix/Linux下,multiprocessing模块封装了fork()调用,使我们不需要关注fork()的细节。由于Windows没有fork调用,因此,multiprocessing需要“模拟”出fork的效果,父进程所有Python对象都必须通过pickle序列化再传到子进程去,所以,如果multiprocessing在Windows下调用失败了,要先考虑是不是pickle失败了。