什么是进程

  1. 进程(Process)是计算机中的程序关于某数据集合上的一次运行活动,是系统进行资源分配和调度的基本单位,是操作系统结构的基础。在早期面向进程设计的计算机结构中,进程是程序的基本执行实体;在当代面向线程设计的计算机结构中,进程是线程的容器。程序是指令、数据及其组织形式的描述,进程是程序的实体。

狭义定义:进程是正在运行的程序的实例。

广义定义:进程是一个具有一定独立功能的程序关于某个数据集合的一次运行活动。

同一个程序执行两次,就会在操作系统中出现两个进程,所以我们可以同时运行一个软件,分别做不同的事情也不会混乱

进程调度

什么是进程调度

  1. 进程调度是指操作系统按照某种策略或规则选择进程占用CPU进行运行的过程

进程调度的算法

  1. 要想多个进程交替运行,操作系统必须对这些进程进行调度,这个调度不是随即进行的,而是需要遵循一定的法则,由此就有了进程的调度算法

先进先出算法

  1. 先进先出算法(FCFS)是一种最简单的调度算法,该算法既可用于作业调度,也可用于进程调度。FCFS算法比较有利于长作业(进程),而不利于短作业(进程)。由此可知,本算法适合于CPU繁忙型作业,而不利于I/O繁忙型的作业(进程)

短进程优先

  1. 短作业(进程)优先调度算法(SJ/PF)是指对短作业或短进程优先调度的算法,该算法既可用于作业调度,也可用于进程调度。但其对长作业不利;不能保证紧迫性作业(进程)被及时处理;作业的长短只是被估算出来的。

时间片轮转法

  1. 时间片轮转法(RR)是专门为分时系统设计的,类似于FCFS调度,但增加了抢占以切换进程。将一个较小时间单元(通常10ms~100ms)定义为时间片。就绪队列作为循环队列,CPU调度程序循环整个就绪队列,为每个进程分配不超过一个时间片的CPU执行。时间片到,中断操作系统,进行上下文切换。时间片轮转法(RR)调度的平均等待时间通常较长

多级队列调度

  1. 多级队列调度算法将就绪队列分成多个单独队列,根据进程属性,如内存大小,进程优先级、进程类型等,一个进程永久分到一个队列。每个队列有自己的调度算法。每个队列与更低层队列相比有绝对的优先

多级反馈队列

  1. 多级表示有多个队列,每个队列优先级从高到低,同时优先级越高时间片越短。反馈表示如果有新的进程加入优先级高的队列时,立刻停止当前正在运行的进程,转而去运行优先级高的队列;

补充:进程调度算法,目的就是为了能够让单核的计算机也能够做到运行多个程序

并发与并行

并发

一个处理器同时处理多个任务

形象的比喻:用一个奶瓶给三个孩子轮流喂奶,这叫并发

多进程编程 - 图1

并行

多个处理器或者是多核的处理器同时处理多个不同的任务

形象的比喻:用三个奶瓶分别给三个孩子喂奶,这叫并行

多进程编程 - 图2

同步与异步

同步

同步就是指一个进程在执行某个请求的时候,若该请求需要一段时间才能返回信息,那么这个进程将会一直等待下去,直到收到返回信息才继续执行下去。

异步

异步是指进程不需要一直等下去,而是继续执行下面的操作,不管其他进程的状态。当有消息返回时系统会通知进程进行处理,这样可以提高执行的效率。

阻塞与非阻塞

进程三状态

就绪态:当进程已分配到除CPU以外的所有必要的资源,只要获得处理机便可立即执行,这时的进程状态称为就绪状态。

运行态:执行/运行(Running)状态当进程已获得处理机,其程序正在处理机上执行,此时的进程状态称为执行状态。

阻塞态:阻塞(Blocked)状态正在执行的进程,由于等待某个事件发生而无法执行时,便放弃处理机而处于阻塞状态。引起进程阻塞的事件可有多种,例如,等待I/O完成、申请缓冲区不能满足、等待信件(信号)等。

阻塞

阻塞调用是指调用结果返回之前,当前线程会被挂起(线程进入非可执行状态,在这个状态下,cpu不会给线程分配时间片,即线程暂停运行)。函数只有在得到结果之后才会返回

非阻塞

非阻塞调用指在不能立刻得到结果之前,该调用不会阻塞当前线

python创建进程

方式一

  1. from multiprocessing import Process
  2. import time
  3. def task(name):
  4. print('Hello', name)
  5. time.sleep(3)
  6. print('我是子进程')
  7. if __name__ == '__main__':
  8. p = Process(target=task, args=('kevin',)) # 创建一个进程对象
  9. p.start() # 创建一个新的进程,异步操作
  10. print("我是主进程")
  11. # 我是主进程
  12. # Hello kevin
  13. # 我是子进程

强调:

  • windows中创建进程是以导入模块的方式进行 所以创建进程的代码必须写在__main__子代码中否则会直接报错 因为在无限制创建进程
  • LinuxMac中创建进程是直接拷贝一份源代码然后执行 不需要写__main__子代码中

方式二

  1. from multiprocessing import Process
  2. import time
  3. class MyProcess(Process):
  4. def __init__(self, username):
  5. self.username = username
  6. super().__init__()
  7. def run(self):
  8. print("Hello,World")
  9. time.sleep(3)
  10. print(self.username)
  11. if __name__ == '__main__':
  12. p = MyProcess('kevin')
  13. p.start()
  14. print("我是主进程")
  15. # 我是主进程
  16. # Hello,World
  17. # kevin

进程实现并发

思路:将与客户端通信的代码封装成一个函数,之后每来一个客户端就创建一个进程专门做交互

服务端

  1. import socket
  2. from multiprocessing import Process
  3. def get_server():
  4. server = socket.socket()
  5. server.bind(('127.0.0.1', 8080))
  6. server.listen(5)
  7. return server
  8. # 将服务客户端的代码封装成函数(通信代码)
  9. def talk(sock):
  10. while True:
  11. data = sock.recv(1024)
  12. print(data.decode('utf8'))
  13. sock.send(data.upper())
  14. if __name__ == '__main__':
  15. server = get_server()
  16. while True:
  17. sock, addr = server.accept()
  18. p = Process(target=talk, args=(sock,))
  19. p.start()

客户端

  1. import socket
  2. client = socket.socket()
  3. client.connect(('127.0.0.1', 8080))
  4. while True:
  5. client.send(b'hello world')
  6. data = client.recv(1024)
  7. print(data.decode('utf8'))

进程对象属性和方法

join方法

让主进程等待子进程运行完毕再执行

  1. from multiprocessing import Process
  2. import time
  3. def task(name):
  4. print('Hello', name)
  5. time.sleep(3)
  6. print('我是子进程')
  7. if __name__ == '__main__':
  8. p = Process(target=task, args=('kevin',))
  9. p.start()
  10. p.join()
  11. print('主进程')
  12. # Hello kevin
  13. # 我是子进程
  14. # 主进程

真正理解过程

  1. from multiprocessing import Process
  2. import time
  3. def task(name, n):
  4. time.sleep(n)
  5. if __name__ == '__main__':
  6. p1 = Process(target=task, args=('kevin', 1))
  7. p2 = Process(target=task, args=('kevin', 2))
  8. p3 = Process(target=task, args=('kevin', 3))
  9. start_time = time.time()
  10. p1.start()
  11. p2.start()
  12. p3.start()
  13. p1.join()
  14. p2.join()
  15. p3.join()
  16. end_time = time.time() - start_time
  17. print('主进程', f'总耗时:{end_time}')
  18. # 主进程 总耗时:3.010664939880371
  1. 执行此过程中,首先分别创建p1p2p3三个进程对象,然后开始计时,然后开始分别创建p1p2p3进程,此时三个进程已经分别开始执行各自的子代码也就是并发,执行到`p1.join()`要等p1的子代码执行完毕后才能执行下面的代码,当执行到`p2.join()`因为是并发,走时间是一样的,已经因为p1走了1秒所以p2再需要走1秒就可以走完他的子代码,同理当执行到`p3.join()`,已经走了两秒了,也是一样的再走一秒就结束了。所有是一共耗时三秒

如果是一个start一个join交替执行 那么总耗时就是各个任务耗时总和,start决定进程是否开始

查看进程号的方法

  1. from multiprocessing import current_process
  2. current_process().pid
  1. import os
  2. os.getpid() # 获取当前进程的进程号
  3. os.getppid() # 获取当前进程的父进程号

杀死子进程的方法

terminate()

  1. from multiprocessing import Process
  2. import time
  3. def task():
  4. time.sleep(3)
  5. print("Hello")
  6. if __name__ == '__main__':
  7. p = Process(target=task)
  8. p.start() # 需要一点点时间
  9. p.terminate() # 需要一点点时间(销毁数据,回收数据)

判断子进程是否存活

is_alive()

  1. from multiprocessing import Process
  2. import time
  3. def task():
  4. time.sleep(3)
  5. print("Hello")
  6. if __name__ == '__main__':
  7. p = Process(target=task)
  8. p.start()
  9. p.terminate()
  10. time.sleep(0.1)
  11. print(p.is_alive())
  12. # False

僵尸进程与孤儿进程

僵尸进程

  1. 子进程退出了,但是父进程没有用waitwaitpid去获取子进程的状态信息,那么子进程的进程描述符(包括进程号 PID,退出状态 the termination status of the process,运行时间 the amount of CPU time taken by the process 等)仍然保存在系统中,这种进程称为僵尸进程。

回收子进程资源的方式,父进程自动结束,调用join方法

孤儿进程

  1. 父进程结束了,而它的一个或多个子进程还在运行,那么这些子进程就成为孤儿进程(father died)。子进程的资源由init进程(进程号PID = 1)回收。

守护进程

  1. 守护进程是系统中生存期较长的一种进程,常常在系统引导装入时启动,在系统关闭时终止,没有控制终端,在后台运行。守护进程脱离于终端是为了避免进程在执行过程中的信息在任何终端上显示并且进程也不会被任何终端所产生的终端信息所打断。
  1. from multiprocessing import Process
  2. import time
  3. def task():
  4. print('活着')
  5. time.sleep(3)
  6. print("死了")
  7. if __name__ == '__main__':
  8. p = Process(target=task)
  9. # 必须写在start前面
  10. p.daemon = True # 将子进程设置为守护进程,主进程结束,子进程立刻结束
  11. p.start()
  12. time.sleep(0.1)
  13. print("Hello World")
  14. # 活着
  15. # Hello World

互斥锁(重要)

锁的作用就是某个线程 在访问某个资源时先锁住,防止其它线程的访问,等访问完毕解锁后其他线程再来加锁进行访问

  1. from multiprocessing import Lock
  2. # 创建互斥锁
  3. lock = threading.Lock()
  4. # 对需要访问的资源加锁
  5. lock.acquire()
  6. # 资源访问结束解锁
  7. lock.release() # 放锁

互斥锁应用

引言

  1. 每逢节假日抢票,手机上明明显示还有余票,但是点击购买的时候却提示已经没有票了,之后回到查询页面发现确实显示没有票了,上午10:00打开买票软件查看票数系统给你发过来的是10:00对应的数据,只要页面不刷新不点击下一步,那么页面数据永远展示的是10:00的。

操作

  1. 当多个进程操作同一份数据的时候会造成数据的错乱,这个时候需要加锁处理(互斥锁),将并发变成串行,牺牲了效率但是保证的数据的安全,互斥锁不要轻易使用,容易造成死锁现象。互斥锁只在处理数据的部分加锁,不能什么地方都加,会严重影响程序的效率
  1. import json
  2. from multiprocessing import Process, Lock
  3. import time
  4. import random
  5. # 查票
  6. def search(name):
  7. with open(r'ticket_data.json', 'r', encoding='utf8') as f:
  8. data = json.load(f)
  9. print(f'{name}查询当前余票:%s' % data.get('ticket_num'))
  10. # 买票
  11. def buy(name):
  12. """
  13. 点击买票是需要再次查票的 因为期间其他人可能已经把票买走了
  14. """
  15. # 1.查票
  16. with open(r'ticket_data.json', 'r', encoding='utf8') as f:
  17. data = json.load(f)
  18. time.sleep(random.randint(1, 3))
  19. # 2.判断是否还有余票
  20. if data.get('ticket_num') > 0:
  21. data['ticket_num'] -= 1
  22. with open(r'ticket_data.json', 'w', encoding='utf8') as f:
  23. json.dump(data, f)
  24. print(f'{name}抢票成功')
  25. else:
  26. print(f'{name}抢票失败 没有余票了')
  27. def run(name, mutex):
  28. search(name)
  29. # 把买票环节将并发变成串行,牺牲了效率但是保证的数据的安全
  30. mutex.acquire() # 抢锁
  31. buy(name)
  32. mutex.release() # 放锁
  33. # 模拟多人同时抢票
  34. if __name__ == '__main__':
  35. # 互斥锁在主进程中产生一把 交给多个子进程用
  36. mutex = Lock()
  37. for i in range(1, 10):
  38. p = Process(target=run, args=('用户:%s' % i, mutex))
  39. p.start()

补充:

  • 行锁针对行数据加锁 同一时间只能一个人操作
  • 表锁:针对表数据加锁 同一时间只能一个人操作
  • 悲观锁:对于同一个数据的并发操作,悲观锁认为自己在使用数据的时候一定有别的线程来修改数据,因此在获取数据的时候会先加锁,确保数据不会被别的线程修改
  • 乐观锁:相对悲观锁而言的,乐观锁假设数据一般情况不会造成冲突,所以在数据进行提交更新的时候,才会正式对数据的冲突与否进行检测,如果冲突,则返回给用户异常信息,让用户决定如何去做。乐观锁适用于读多写少的场景,这样可以提高程序的吞吐量

锁的应用范围很广,但是核心都是为了保证数据的安全

进程间数据默认隔离

在同一台计算机上的多个应用程序在内存中是相互隔离的(物理级别隔离)

  1. from multiprocessing import Process
  2. a = 999
  3. def task():
  4. global a
  5. a = 666
  6. if __name__ == '__main__':
  7. p = Process(target=task)
  8. p.start()
  9. p.join() # 确保子进程代码运行结束再打印a
  10. print(a)
  11. # 999

消息队列(内置队列)

创建

  1. from multiprocessing import Queue
  2. queue = Queue(队列长度)

方法

方法 描述
put 变量名.put(数据),放入数据(如队列已满,则程序进入阻塞状态,等待队列取出后再放入)
put_nowait 变量名.put_nowati(数据),放入数据(如队列已满,则不等待队列信息取出后再放入,直接报错)
get 变量名.get(数据),取出数据(如队列为空,则程序进入阻塞状态,等待队列防如数据后再取出)
get_nowait 变量名.get_nowait(数据),取出数据(如队列为空,则不等待队列放入信息后取出数据,直接报错),放入数据后立马判断是否为空有时为True,原因是放入值和判断同时进行
qsize 变量名.qsize(),消息数量
empty 变量名.empty()(返回值为True或False),判断是否为空
full 变量名.full()(返回值为True或False),判断是否为满

使用

  1. from multiprocessing import Queue
  2. queue = Queue(5) # 自定义队列的长度
  3. queue.put(111) # 向队列中存放数据
  4. queue.put(222)
  5. print(queue.full()) # 判断队列是否满了
  6. # False
  7. queue.put(333) # 向队列中存放数据
  8. queue.put(444)
  9. queue.put(555)
  10. print(queue.full()) # 判断队列是否满了
  11. # True
  12. # queue.put(666) # 超出最大长度 原地阻塞等待队列中出现空位
  13. print(queue.get())
  14. # 111
  15. print(queue.get())
  16. # 222
  17. print(queue.get())
  18. # 333
  19. print(queue.empty()) # 判断队列是否空了
  20. # False
  21. print(queue.get())
  22. # 444
  23. print(queue.get())
  24. print(queue.empty()) # 判断队列是否空了
  25. # True
  26. # print(queue.get()) # 队列中没有值 继续获取则阻塞等待队列中给值
  27. print(queue.get_nowait())

补充:上述方法并不能够在并发场景下精准使用,用途在于让进程与进程之间数据通信

IPC机制(进程通信)

  1. 因为进程间不共享全局变量,所以使用队列进行数据通信,可以在父进程中创建两个子进程,一个往队列里写数据,另一个从队列里取出数据,实现不同内存空间中的进程,数据交互
  1. from multiprocessing import Queue, Process
  2. def write_queue(queue):
  3. queue.put("子进程write_queue往队列中添加值")
  4. def read_queue(queue):
  5. print("子进程read_queue从队列中取值>>>:", queue.get())
  6. if __name__ == '__main__':
  7. queue = Queue()
  8. p1 = Process(target=write_queue, args=(queue,))
  9. p2 = Process(target=read_queue, args=(queue,))
  10. p1.start()
  11. p2.start()

生产者消费者类型

  1. 生产者就是生产数据,消费者就是消费数据。比如,爬取网页的代码可以称之为生产者,对数据筛选的代码可以称之为消费者
  1. import random
  2. from multiprocessing import Process, Queue, JoinableQueue
  3. import time
  4. def producer(name, food, queue):
  5. for i in range(1, 11):
  6. data = f'{name},生产了{food},一共生产了{i}份'
  7. time.sleep(2)
  8. print(data)
  9. queue.put(food)
  10. def consumer(name, queue):
  11. while True:
  12. # 不能用empty去结束,因为是多个进程在操作
  13. # if.queue.empty():break
  14. food = queue.get()
  15. # if not food:
  16. # break
  17. time.sleep(2)
  18. print(f"{name},吃了{food}")
  19. queue.task_done() # 每次去完数据必须给队列一个反馈
  20. if __name__ == '__main__':
  21. # queue = Queue()
  22. queue = JoinableQueue() # 会自己计算里面有没有值
  23. p1 = Process(target=producer, args=('kevin', '土豆丝', queue))
  24. p2 = Process(target=producer, args=('jason', '麻婆豆腐', queue))
  25. c3 = Process(target=consumer, args=('tony', queue))
  26. c4 = Process(target=consumer, args=('jerry', queue))
  27. c3.daemon = True
  28. c4.daemon = True
  29. p1.start()
  30. p2.start()
  31. c3.start()
  32. c4.start()
  33. # 生产者生产完所以数据之后,往队列中添加结束信号
  34. p1.join()
  35. p2.join()
  36. # queue.put(None) # 结束信号的个数要跟消费者的个数一致才行
  37. # queue.put(None)
  38. """队列中其实已经自己加了锁 所以多进程取值也不会冲突 并且取走了就没了"""
  39. queue.join() # 等待队列中数据全部被取出(一定要让生产者全部结束才能判断正确)
  40. """执行完上述的join方法表示消费者也已经消费完数据了"""

其实需要考虑的问题其实就是供需平衡的问题,生产力与消费力要均衡