z操作系统的发展史
进程
对于同步和异步之间的理解
- 同步
同步的意思是指两个进程的运行是相关的,其中一个进程要阻塞等待另外一个进程的运行(假设在银行排队,如果是同步阻塞的话,在银行排队的过程中不可以做任何的事情,需要等待银行排队完之后才可以进入下一个步骤)
- 异步
异步的意思是两个进程之间毫不相关(不需要相互等待),自己运行自己的(在银行排队,在排队的同时可以玩手机回消息)
阻塞和非阻塞状态
- 同步阻塞状态:效率最低。专心排队,别的什么事情都不可以做
- 异步阻塞状态:两个进程阻塞了一个但是不影响另外一个进程的运行
- 同步非阻塞:两个进程之间相互自由切换
-
进程
from multiprocessing import process 可以创建多进程
一种方式是直接通过调用函数的形式来创建进程
一种方式是通过继承父类的方式来创建进程
p.start():开始执行子进程守护进程
当主进程运行结束的时候,子进程就会停止
p.dameon = True:可以设置子进程为守护进程,但是必须要在p.start之前设置其为守护进程
在linux当中也有很多的守护进程,它们的父进程都是systemd进程,而systemd进程是开机启动之后的第一个进程,可以达到一种效果lock
当多进程操作同一份数据的时候,会产生数据安全性问题,可以通过对任务加锁的形式解决来保证数据的安全性。
———————-正式笔记阶段—————————-
进程
什么是进程
进程是计算机中程序关于某数据集合上的一次运行活动,是系统进行资源分配和调度的基本单位,是操作系统结构的基础
狭义定义中:进程是正在运行的程序的实例。
广义定义:进程是具有某一个功能的程序关于某个数据集合的一次运行活动,是操作系统动态执行的基本单元,在传统的操作系统中,进程既是基本的分配单元,也是基本的执行单元进程的详细解释
进程是一个实体,每一个进程都有它自己的地址空间,一般情况下,包括文本区域、数据区域、堆栈。文本区域存储处理器执行的代码;数据区域存储变量和进程执行期间使用的动态分配的内存,堆栈区域存储着活动过程调用的指令和本地变量
- 进程是一个“执行中的程序”,程序是一个没有生命的实体,只有处理器执行该程序的时候,它才成为一个活动的实体,我们称之为进程
所有多道程序设计操作系统都建立在进程的基础上
进程的特点:
- 动态性:进程是程序在多道程序系统中的一次执行过程,进程是动态产生的,动态消亡的
- 并发性:任何进程可以同其他的进程一起并发执行
- 独立性:进程是一个可以独立运行的基本单位,同时也是系统分配和调度资源的独立单位
- 异步性:由于进程之间是相互制约的,使进程具有执行的间断性,即进程按各自独立的、不可预知的速度向前推进
- 结构特征:进程由程序数据和进程控制块三部分组成
- 多个不同的进程可以包含相同的程序:一个程序在不同的数据集里就构成不同的进程;能得到不同的结果,但是在执行过程中,程序不可以被改变
-
进程调用的多种方式
先来先服务(FCFS)调度算法
- 短作业(进程)优先调度算法
- 时间片轮转(Round Robin,RR)算法
-
进程的并行与并发
并发:并发是指资源在有限的情况下,两者交替轮流使用资源,比如一段路(单核CPU资源)同时只能通过一个进程。A走一段时间之后,让给B,B用完资源之后给A,交替使用资源,以达到提高效率的目的
- 并行是从微观上的,也就是在一个精确的时间片刻,有不同的程序在运行,这就必须要求有多个处理器
并发是宏观上的,在一个时间段内,可以看出是同时执行的,比如在一个服务器上同时处理多个session
同步异步的阻塞与非阻塞
进程的状态的介绍
就绪状态:进程分配到除CPU以外的所有必要的资源,等待执行,等待获得处理机
- 执行状态:进程已经获得处理机,程序正在运行
阻塞状态:正在运行的进程,由于等待某个事件而无法执行,便放弃处理机处于阻塞状态,例如等待I/O完成,申请缓冲区不能满足
同步和异步
同步:一个任务的完成需要依赖于另外一个任务,是一种可靠的任务序列,两个任务的状态可以保持一致
- 异步:不需要等待被依赖的任务的完成,是不可靠任务序列
阻塞和非阻塞
同步/异步与阻塞/非阻塞
- 同步阻塞状态:效率最低。专心排队,别的什么事情都不可以做
- 异步阻塞状态:两个进程阻塞了一个但是不影响另外一个进程的运行
- 同步非阻塞:两个进程之间相互自由切换
-
进程的创建和退出
创建新的进程的四种方式
系统初始化,运行在后台的进程只有在需要的时候才会被唤醒
- 一个进程在运行过程中开启了子进程
- 用户的交互性请求
-
进程的结束
正常退出:自愿,一般是用户自行操作的进程的结束
- 出错退出:
- 严重错误:非自愿,在python程序中的逻辑错误产生的退出,可以使用try和except语句进行错误的捕获和处理
- 被其他进程杀死:非自愿,如kill-9
多进程使用的模块
multiprocess模块的常用方法
p.start
p.start()
启动进程,并调用该子进程中的p.run()
p.run() 进程启动时运行的方法,正是它去调用target指定的函数,我们自定义类的类 中一定要实现该方法p.terminate()
强制终止进程p,不会进行任何清理操作,如果p创建了子进程,该子进程就 成了僵尸进程,使用该方法需要特别小心这种情况。如果p还保存了一个锁那 么也将不会被释放,进而导致死锁p.is_alive()
如果p仍然运行,返回True
p.join([timeout])
主线程等待p终止(强调:是主线程处于等的状态,而p是处于运行的状 态)。timeout是可选的超时时间,需要强调的是,p.join只能join住start开 启的进程,而不能join住run开启的进程p.daemon
默认值为False,如果设为True,代表p为后台运行的守护进程,当p的父进程终止 时,p也随之终止,并且设定为True后,p不能创建自己的新进程,必须在 p.start()之前设置 p.name 进程的名称p.pid
进程的pid,进程在内存之中的地址p.exitcode
进程在运行时为None、如果为–N,表示被信号N结束(了解即可)p.authkey
进程的身份验证键,默认是由os.urandom()随机生成的32字符的字符串。这个键的 用途是为涉及网络连接的底层进程间通信提供安全性,这类连接只有在具有相同的 身份验证键时才能成功(了解即可)对于子进程、父进程、和守护进程之间的关系的理解
- 父进程与子进程之间的关系:假设是a进程创建了b进程,那么a进程就是b进程的父进程。反之,假设是b创建了a,那么b进程就是a的父进程。子进程的所有资源都继承父进程,得到父进程资源的副本,既然为副本,也就是说,二者并不共享地址空间。两个是单独的进程,继承了以后二者就没有什么关联了,子进程单独运行。
- 守护进程在父进程完成之后才结束
常见的
if __name__=='__main__'
也是一个进程,在其中创建的进程被称为是其的子进程守护进程
会随着主进程的结束而结束
- 主进程创建守护进程
- 守护进程会在主进程代码结束之后就终止
- 守护进程内无法再开启子进程,否则会抛出异常
示例代码
import os
import time
from multiprocessing import Process
class Myprocess(Process):
def __init__(self, person):
super().__init__()
self.person = person
def run(self):
print(os.getpid(), self.name)
print('%s正在和女主播聊天'%self.person)
if __name__ =='__main__':
p = Myprocess('陈松')
p.daemon =True
p.start()
time.sleep(10)
print('主')
import time
from multiprocessing import Process
def foo():
print(123)
time.sleep(1)
print('end123')
def bar():
print(456)
time.sleep(3)
print('end456')
if __name__ == '__main__':
p1 = Process(target=foo)
p2 = Process(target=bar)
p1.daemon = True
p1.start()
p2.start()
time.sleep(0.1)
print('main___')
在上述的代码之中,运行结果显示守护进程没有进行输出。起到守护的作用
socket聊天并发实例
服务端
from socket import *
from multiprocessing import Process
server = socket(AF_INET, SOCK_STREAM)
server.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
server.bind(('127.0.0.1',8080))
server.listen(5)
def talk(conn, client_addr):
while True:
try:
msg = conn.recv(1024)
if not msg:break
conn.send(msg.upper())
except Exception:
break
if __name__ =='__main__':
while True:
conn, client_addr = server.accept()
print(client_addr)
# talk(conn, client_addr)
p = Process(target=talk, args=(conn, client_addr))
p.start()
##这里使用进程的方式个人认为是因为要在一个任务完成之后结束该任务,而不是在阻塞另外一个进程,一般写的代码个人认为是单线程的
使用while 循环的方式,不断地创建进程,与客户端进行连接,不使用多进程的方式的话没有办法与多个客户端进行通信,会阻塞在其中的一个进程之中
客户端
from socket import *
client = socket(AF_INET,SOCK_STREAM)
client.connect(('127.0.0.1', 8080))
while True:
msg = input('>>').strip()
if not msg:continue
client.send(msg.encode('utf-8'))
msg = client.recv(1024)
print(msg.decode('utf-8'))
进程同步(multiprocess.Lock)
锁———multiprocess.Lock
当多个进程同时使用一份数据资源的时候,会引发数据安全或顺序混乱的问题
解决该问题的代码示例
import os, time, random
from multiprocessing import Process, Lock
def work(lock, n):
lock.acquire()
print('%s: %s is running' % (n, os.getpid()))
time.sleep(random.random())
print('%s: %s is done' % (n, os.getpid()))
lock.release()
if __name__ =='__main__':
lock = Lock()
for i in range(3):
p = Process(target=work, args=(lock, i))
p.start()
进程之间的通信——队列
进程间通信
队列中的经典的方法
Queue([maxsize])
创建共享的进程队列。maxsize是队列中允许的最大项数。如果省略此参数,则无大小限制。 底层队列使用管道和锁定实现。另外,还需要运行支持线程以便队列中的数据传输到底层管道 中。 Queue的实例q具有以下方法: q.get( [ block [ ,timeout ] ] )
返回q中的一个项目。如果q为空,此方法将阻塞,直到队列中有项目可用为止。block用于控 制阻塞行为,默认为True. 如果设置为False,将引发Queue.Empty异常(定义在Queue模块 from multiprocessing import Process def work(): global n n = 0 print(‘子进程’,n) if name == ‘main‘: n = 100 p = Process(target=work) p.start() print(‘主进程’,n) 1 2 3 4 5 6 7 8 9 10 11 12 Queue([maxsize]) 创建共享的进程队列。 参数 :maxsize是队列中允许的最大项数。如果省略此参数,则无大小限制。 底层队列使用管道和锁定实现。 1 2 3 4 中)。timeout是可选超时时间,用在阻塞模式中。如果在制定的时间间隔内没有项目变为可 用,将引发Queue.Empty异常。q.get_nowait( )
同q.get(False)方法。 q.put(item [, block [,timeout ] ] )
将item放入队列。如果队列已满,此方法将阻塞至有空间可用为止。block控制阻塞行为,默 认为True。如果设置为False,将引发Queue.Empty异常(定义在Queue库模块中)。 timeout指定在阻塞模式中等待可用空间的时间长短。超时后将引发Queue.Full异常。 q.qsize()
返回队列中目前项目的正确数量。此函数的结果并不可靠,因为在返回结果和在稍后程序中使 用结果之间,队列中可能添加或删除了项目。在某些系统上,此方法可能引发 NotImplementedError异常。 q.empty()
如果调用此方法时 q为空,返回True。如果其他进程或线程正在往队列中添加项目,结果是 不可靠的。也就是说,在返回和使用结果之间,队列中可能已经加入新的项目。 q.full()
如果q已满,返回为True. 由于线程的存在,结果也可能是不可靠的(参考q.empty()方 法)。。
q.close() 关闭队列,防止队列中加入更多数据。调用此方法时,后台线程将继续写入那些已入队列但尚未写 入的数据,但将在此方法完成时马上关闭。如果q被垃圾收集,将自动调用此方法。关闭队列不会 在队列使用者中生成任何类型的数据结束信号或异常。例如,如果某个使用者正被阻塞在get() 操作上,关闭生产者中的队列不会导致get()方法返回错误。 q.cancel_join_thread()
不会再进程退出时自动连接后台线程。这可以防止join_thread()方法阻塞。 q.join_thread()
连接队列的后台线程。此方法用于在调用q.close()方法后,等待所有队列项被消耗。默认情况下, 此方法由不是q的原始创建者的所有进程调用。调用q.cancel_join_thread()方法可以禁止这种行 为。
示例代码1
from multiprocessing import Queue
q = Queue(3)
q.put(1)
q.put(2)
q.put(3)
try:
q.put_nowait(4)
except:
print('队列已经满了')
print(q.full())
print(q.get())
print(q.get())
print(q.get())
示例代码2
import time
from multiprocessing import Process, Queue
def f(q):
q.put([time.asctime(), 'from earth', 'hello'])
if __name__ =='__main__':
q = Queue()
p = Process(target=f, args=(q,))
p.start()
print(q.get())
p.join()
import os
import time
from multiprocessing import Queue, Process, freeze_support
def inputQ(queue):
info = str(os.getpid() + '(put):' +str(time.asctime()))
queue.put(info)
def outputQ(queue):
info = queue.get()
print('%s%s\033[32m%s\033[0m'%(str(os.getpid()), '(get):',info))
if __name__ == 'main':
freeze_support()
record1 = []
record2 = []
queue = Queue(3)
for i in range(5):
process = Process(target=inputQ, args=(queue,))
time.sleep(1)
# process.start()
record1.append(process)
for i in range(5):
process = Process(target=outputQ, args=(queue,))
# process.start()
record2.append(process)
for p in record1:
p.start()
p.join()
for p in record2:
p.start()
p.join()
生产者消费者模型
from multiprocessing import Process, Queue
import time, random, os
def consumer(q):
while True:
res = q.get()
if res is None:break
time.sleep(random.randint(1,3))
print('%s 吃 %s' %(os.getpid(),res))
def producer(name,q):
for i in range(2):
time.sleep((random.randint(1,3)))
res = '%s%s'%(name, i)
if res is None: break
q.put(res)
print('%s生产了%s'%(os.getpid(), res))
if __name__ == '__main__':
q = Queue()
p1 = Process(target=producer, args = ('baozi', q))
p2 = Process(target=producer, args = ('gutou', q))
p3 = Process(target=producer, args = ('ganshui',q))
c1 = Process(target=consumer, args = (q,))
c2 = Process(target=consumer, args = (q, ))
c3 = Process(target=consumer, args = (q,))
p1.start()
p2.start()
p3.start()
c1.start()
c2.start()
p1.join()
p2.join()
p3.join()
q.put(None)
q.put(None)
print('zhu')
进程池和multiprocess.Pool模块
进程池
在成千上万个任务需要被执行的时候,我们使用这样的方式进行解决问题,定义一个池子,在里面放上固定数量的进程,有需求来了,就拿一个池中的进程来处理任务,等到处理玩波比,进程不关闭,而是将进程再放回进程池中继续等待任务。也就是说,池中的进程的数量是固定的,那么同一时间最多有固定数量的进程在运行。这样不会增加操作系统的调度难度,还节省了开闭进程的时间,也一定程度上实现并发的效果
在进程的使用过程中,往往进程的创建和删除才是最耗时的事情,所以进程池合理的解决了这个问题,所以调用进程池的方式会很节省时间
进程池和多进程的效率的对比
示例代码:
使用pool的程序
from multiprocessing import Pool
import time
def func(n):
for i in range(10):
print(n+1)
if __name__ == "__main__":
start = time.time()
pool = Pool(5)
pool.map(func, range(100))#使用map的方式,第一个参数是一个函数的地址,第二个参数是一个可迭代对象,作为前面的函数的输入参数
t2 = (time.time() - start)
print(t2)
使用Process的程序
from multiprocessing import Process
import time
def func(n):
for i in range(10):
print(n+1)
if __name__ == '__main__':
t1 = time.time()
p_list = []
for i in range(100):
p = Process(target=func, args=(i,))
p_list.append(p)
p.start()
for p in p_list:
p.join()
t2 = (time.time() - t1)
print(t2)
在对比两者之间的时间,发现使用进程池的方式可以大大减小程序所占用的时间
join()方法可以在当前位置阻塞主进程,待执行join()的进程结束之后再继续执行主进程的程序
异步调用和同步调用(调用是相对于进程池中的进程进行调用的)
同步调用示例
import os,time
from multiprocessing import Process, Pool
def work(n):
print('%s run'%os.getpid())
time.sleep(3)
return n**2
if __name__ == '__main__':
p = Pool(3)
res_l = []
for i in range(10):
res = p.apply(work, args=(i,))
print(res_l)
异步调用示例
import os, time, random
from multiprocessing import Pool
def work(n):
print('%s run' %os.getpid())
time.sleep(random.random())
return n**2
if __name__ == '__main':
p = Pool(3)#进程池中创建三个进程,以后一直使用这三个进程进行任务
res_l = []
for i in range(10):
res = p.apply_async(work, args=(i,))#异步运行,根据进程池中的进程数,每次最多三个进程运行
#返回结果之后,将结果放入列表之中,
#需要注意,进程池中的三个进程会不会同时开始或者同时结束
#而是执行完一个进程就释放一个进程,这个金策韩国就去接收新的任务
res_l.append(res)
p.close()
p.join()
for res in res_l:
print(res.get())
聊天客户端与服务端
from socket import SOCK_STREAM, AF_INET, socket, SOL_SOCKET, SO_REUSEADDR
from multiprocessing import Pool
import os
server = socket(AF_INET, SOCK_STREAM)
server.setsockopt(SOL_SOCKET, SO_REUSEADDR ,1)
server.bind(('127.0.0.1', 8081))
server.listen()
def talk(conn):
print('进程pid:%s' %os.getpid())
while True:
try:
msg = conn.recv(1024)
if not msg:break
conn.send(msg.upper())
except Exception:
break
if __name__ == '__main__':
p = Pool(4)
while True:
conn, addr = server.accept()
p.apply_async(talk, args=(conn, ))
聊天程序的主要流程
- 创建socket套接字
- 设置socket套接字在关闭端口之后,及时释放端口
- 将套接字绑定端口
- 持续监听端口
- 定义信息接收和处理函数
- 主函数中
- 初始化进程池
- 循环的接收conn和addr
- 异步调用进程池中的进程
顶级理解:进程池中的进程只是创建和申请了内存中的资源,并没有定义该进程需要做些什么,在使用的时候还需要再同步或者异步调用进程池中的进程,重新定义该进程的作用
客户端的代码
from socket import AF_INET, SOCK_STREAM, socket
client =socket(AF_INET, SOCK_STREAM)
client.connect(('127.0.0.1', 8081))
while True:
msg = input('>>: ').strip()
if not msg :break
client.send(msg.encode('utf-8'))
msg = client.recv(1024)
print(msg.decode('utf-8'))
客户端的主要程序
- 创建socket套接字(tcp协议)
- 将套接字连接在指定的端口上,相应的该端口的背后是服务端,所以相当于将该客户端连接到了服务端
- 循环的向服务端指令和信息
- 从服务端接收数据
- 打印数据
回调函数(没有完成,回头继续完成)
需要使用到回调函数的场景
- 进程池中的任何一个任务一旦处理完了,就立刻告诉主进程。我好了,你可以处理我的结果了,主进程则调用一个函数处理该结果,该函数即回调函数
- 我们可以把耗时的时间(阻塞)任务放到进程池中,然后指定回调函数(主进程负责执行),这样主进程在执行回调函数的时候就省去了IO的过程,直接拿到的就是任务的结果