Linux相关命令
- cat /proc/cpuinfo:查看CPU个数
- ps命令:查看活动进程
- ps -aux:查看所有的进程
- ps -aux| grep aa:查看指定进程
- ps -elf:查看父子进程关系
- ps elf| grep aa:查看指定的父子进程关系
- top:显示前20条进程,会动态改变,按q退出,详细说明可看课件
- bg 作业ID:把暂停的该进程拉到后台运行,ctrl + z可以把进程暂停
- fg 作业ID:把后台进程拉到前台
- jobs:查看后台任务
- kill -l:查看系统的所有信号
- kill -9 进程号:杀死某个进程
- pkill -f a:杀死进程名为a的进程
- free:查看物理内存
- fdisk -l:查看磁盘及磁盘分区情况
-
创建子进程
需要使用multiprocessing模块,这个模块是跨平台版本的多进程模块,提供了一个Process类来代表一个进程对象,这个对象可以理解为是一个独立的进程,可以执行另外的事情
- 创建子进程的方法:首先创建一个Process类的实例,可以只传入一个所执行函数的参数,然后用start()方法启动该进程
- Process类的参数说明:Process(group, target, name, args, kwargs)
- group:指定进程组,大多数情况下用不到
- target:所创建子进程所执行的函数
- name:给进程设定一个名字,可以不设定
- args:给target指定的函数传递的参数,以元组的方式传递
- kwargs:给target指定的函数传递的参数,以字典的方式传递
- Process类的实例对象的常用方法
- start():启动子进程实例
- is_alive():判断子进程是否还活着
- join([timeout]):如果子进程结束了,等待timeout后回收子进程尸体
- terminate():立即结束子进程
- os.getpid():获取进程的pid
- os.getppid():获取父进程的pid
- 注意:创建子进程时,是把父进程的资源复制一份给子进程,因此在子进程中对全局变量的修改不会影响父进程中的全局变量的值
- 孤儿进程:父进程结束了,子进程还未结束,子进程变为孤儿进程
- 僵尸进程:子进程结束了,父进程尚未回收子进程,子进程变为僵尸进程 ```python from multiprocessing import Process import time
def run_proc(): while True: print(‘子进程’) time.sleep(1)
if name == ‘main‘: p = Process(target=run_proc) p.start() while True: print(‘父进程’) time.sleep(1)
```python
from multiprocessing import Process
import os
def run_proc(name, age, dict1, **kwargs):
print('子进程运行中, name = %s, age = %d, pid = %d' % (name, age, os.getpid()))
print(dict1)
print(kwargs)
if __name__ == '__main__':
p = Process(target=run_proc, args=('ZYJ', 22, {'111': 1}), kwargs={'222': 2})
p.start()
from multiprocessing import Process
import os
import time
nums = [11, 22]
def work():
print("in son process, pid = %d, nums = %s" % (os.getpid(), nums))
for i in range(3):
nums.append(i)
print("in son process, pid = %d, nums = %s" % (os.getpid(), nums))
time.sleep(1)
if __name__ == '__main__':
print("in main process, pid = %d, nums = %s" % (os.getpid(), nums))
p = Process(target=work)
p.start()
p.join()
print("in main process, pid = %d, nums = %s" % (os.getpid(), nums))
进程间通信
可以使用multiprocessing模块中的Queue类实现多进程之间的数据传递,Queue的用法如下
- q = Queue([n]):初始化Queue对象,若括号中没有指定最大可接收的消息数量,或数量为负值,那么就代表可接收的消息数量没有上限
- q.put(item[, block][, timeout]):将item消息写入队列,block的默认值为True
- 如果block使用默认值True
- 且没有设置timeout(单位为秒),当消息队列没有空间可写入时,程序将被阻塞(停在写入状态),直到消息队列腾出空间为止
- 如果设置了timeout,当消息队列没有空间可写入时,则会等待timeout秒,若还没有空间,则抛出”Queue.Full”异常
- 如果block的值为False,当消息队列没有空间可写入时,则会立刻抛出”Queue.Full”异常
- 如果block使用默认值True
- q.put_nowait(item):相当于q.put(item, False)
- q.get([block][, timeout]):获取队列中的一条信息,然后将其从队列中移除,block的默认值为True
- 如果block使用默认值True
- 且没有设置timeout(单位为秒),当消息队列为空时,程序将被阻塞(停在读取状态),直到从消息队列读出消息为止
- 如果设置了timeout,当消息队列为空时,则会等待timeout秒,若还没读取到任何消息,则会抛出”Queue.Empty”异常
- 如果block的值为False,当消息队列为空时,则会立刻抛出”Queue.Empty”异常
- 如果block使用默认值True
- q.get_nowait():相当于q.get(False)
- q.qsize():返回当前队列包含的消息数量
- q.empty():判断当前队列是否为空
- q.full():判断当前队列是否满了 ```python from multiprocessing import Process, Queue import time
def writer(q: Queue): for value in [‘A’, ‘B’, ‘C’]: q.put(value) # 写入消息队列,如果队列没有空间,程序将被阻塞 print(‘put %s into queue’ % value) time.sleep(1)
def reader(q: Queue): while True: if not q.empty(): value = q.get(True) # 读取消息队列,如果队列为空,程序将被阻塞 print(‘get %s from queue’ % value) time.sleep(1.5) else: break
if name == ‘main‘: q = Queue() pw = Process(target=writer, args=(q,)) pw.start()
pr = Process(target=reader, args=(q,))
pr.start()
pw.join()
pr.join()
<a name="IajSV"></a>
## 进程池Pool
- 当需要创建的子进程数量不多时,可以直接使用multiprocessing模块中的Process类手动创建多个子进程,但如果需要创建上百甚至上千个子进程,手动创建子进程的工作量巨大,此时就可以使用multiprocessing模块提供的Pool方法
- 初始化Pool时,可以指定一个最大进程数,当有新的请求提交到Pool中时,如果进程池还没有满,那么就会创建一个新的进程用来执行该请求;但如果进程池中的进程数已经达到指定的最大值,那么该请求就会等待,直到进程池中有进程结束,才会用之前的进程来执行新的请求
- multiprocessing.Pool的常用方法
- po = Pool(n):指定进程池中的最大进程数,如果不指定则表示没有上限
- po.apply_async(func[, args]):使用非阻塞方式调用func(并行执行,阻塞方式必须等待上一个进程退出才能执行下一个进程),args为传递给func的参数列表
- po.close():关闭Pool,使其不再接受新的请求,但不会终止正在运行的进程
- po.terminate():关闭Pool,不管请求是否完成,立即终止所有的进程
- po.join():主进程阻塞,等待子进程的退出,必须在close或terminate之后使用
- 进程池中的Queue:需要使用multiprocessing.Manager()中的Queue(),而不是multiprocessing.Queue()
```python
from multiprocessing.pool import Pool
import os
import random
import time
def worker(msg):
print("%s号子进程开始执行,pid为%d" % (msg, os.getpid()))
time.sleep(random.random() * 2) # random.random()随机生成0~1之间的浮点数
print("%s号子进程执行结束,pid为%d" % (msg, os.getpid()))
if __name__ == '__main__':
print("主进程开始执行")
po = Pool(3)
for i in range(5):
po.apply_async(worker, (i,))
po.close()
# po.terminate()
po.join()
print("主进程执行结束")
from multiprocessing import Manager, Pool
import time
def writer(q):
for value in ['A', 'B', 'C']:
q.put(value) # 写入消息队列,如果队列没有空间,程序将被阻塞
print('put %s into queue' % value)
time.sleep(1)
def reader(q):
while True:
if not q.empty():
value = q.get(True) # 读取消息队列,如果队列为空,程序将被阻塞
print('get %s from queue' % value)
time.sleep(1.5)
else:
break
if __name__ == "__main__":
q = Manager().Queue()
po = Pool()
po.apply_async(writer, (q,))
po.apply_async(reader, (q,))
po.close()
po.join()