多任务
操作系统轮流让各个任务交替执行,任务1执行0.01秒,任务2执行0.01秒,再切换到任务3,执行0.01秒…表面上每个任务都是交替执行的,但是由于CPU执行速度过快,看似是同时进行
真正的并行多任务只能再多核CPU上实现,但是任务数量远远多于CPU的核心数量,所以操作系统也会自动把很多任务轮流调度到每个核心上执行
进程
程序编程写完没有运行成称之为程序。正在运行的代码就是进程。python中对多进程支持的是multiprocessing模块和subprocess模块。
进程创建
multiprocessing模块在子进程中运行任务、通信个共享数据,以及执行各种形式的同步提供支持。语法如下Process ([group[,target[,name[,args[,kwargs]]]]])
其中target表示调用对象,args表示调用对象的位置参数元组。kwargs表示调用对象的字典。name表示别名。group参数未使用,值始终为None
构造函数简单的构造一个Process进程,Process的实例方法、Process的实例属性如下:
Process实例方法表
| 方法 | 描述 |
|---|---|
| is_alive() | 如果p仍然在运行,则返回True |
| join(timeout) | 等待进行p终止。Timeout是可选的超时期限,进程可以被链接无数次,但如果连接自身则会出错 |
| run() | 进程启动时运行的方法。默认情况下,会调用传递给Process构造函数的target。定义进程的另一种方法是继承Process类并重新实现run()函数 |
| start() | 启动进程,这将运行代表进程的子进程,并调用该子进程的run()函数 |
| terminate() | 强制终止进程。如果调用此函数,进程p将被立即终止,同时不会进行任何清理动作。如果进程p创建了它自己的子进程,这些进程将会变成僵尸进程。使用此方法需要特别小心。如果p保存了一个锁或参与了进程通信,那么终止它可能会导致死锁或I/O损坏 |
#导入模块from multiprocessing import Processdef run_test():print("test")if __name__=="__main__":print("主进程执行")#创建子进程 target接收执行任务p=Process(target=run_test())#调用子进程p.start()
C:\Users...ke/Desktop/多线程和并发编程/创建子进程并执行.py主进程执行testProcess finished with exit code 0
#导入模块from multiprocessing import Processfrom time import sleep#定义任务的函数def run_test(name,age,**kwargs):print("子进程正在运行 name的值:%s,age的值:%d"%(name,age))print("字典的kwargs:",kwargs)if __name__=="__main__":print("主进程开始")#创建子进程p=Process(target=run_test,args=("test",23),kwargs={"key":12})#调用子进程p.start()
C:\Use.../创建子进程并传递参数.py主进程开始子进程正在运行 name的值:test,age的值:23字典的kwargs: {'key': 12}Process finished with exit code 0
join方法的使用
#导入模块from multiprocessing import Processfrom time import sleepdef worker(interval):print("work start")sleep(interval)print("work end")if __name__=="__main__":print("主进程")p=Process(target=worker,args=(5,))p.start()p.join(3)print("主进程执行完")
C:\Use...join方法的使用.py主进程work start主进程执行完work endProcess finished with exit code 0
Process实例属性表
| 方法 | 描述 |
|---|---|
| name | 进程的名字 |
| pid | 进程的整数进程ID |
#导入模块import multiprocessingimport timedef colck(interval):for i in range(3):print("当前时间;{}".format((time.ctime())))time.sleep(interval)if __name__=="__main__":p=multiprocessing.Process(target=colck,args=(1,))p.start()p.join()print("p.id",p.pid)print("p.name",p.name)print("p_alive",p.is_alive())
C:\Uses...Process的两个属性.py当前时间;Mon May 16 12:45:06 2022当前时间;Mon May 16 12:45:07 2022当前时间;Mon May 16 12:45:08 2022p.id 15952p.name Process-1p_alive FalseProcess finished with exit code 0
多任务的创建
#导入模块from multiprocessing import Processfrom time import sleepdef work1(interval):print("执行work1")sleep(interval)print("end work1")def work2(interval):print("执行work2")sleep(interval)print("end work2")def work3(interval):print("执行work3")sleep(interval)print("end work3")if __name__=="__main__":print("执行主任务")p1=Process(target=work1,args=(4,))p2 = Process(target=work2, args=(2,))p3 = Process(target=work3, args=(3,))p1.start()p2.start()p3.start()p1.join()p2.join()p3.join()print("主进程结束")
C:\Users...多进程的创建.py执行主任务执行work2执行work1执行work3end work2end work3end work1主进程结束进程已结束,退出代码0
进程的创建Process子类
创建进程的方式还可以使用类的方式,可以自定义一个类,继承Process类,每次实例化这个类的时候,就等同于实例化一个进程的对象。
#导入模块import timefrom multiprocessing import Processfrom time import sleep#定义类class ClockProcess(Process):def __init__(self,interval):Process.__init__(self)self.interval=interval#重写run方法def run(self):print("子进程开始的时间;{}".format(time.ctime()))sleep(self.interval)print("子进程结束的时间;{}".format(time.ctime()))if __name__=="__main__":p=ClockProcess(3)p.start()p.join()print("主进程执行结束")
C:\Users...使用进程方式创建进程.py子进程开始的时间;Mon May 16 13:08:24 2022子进程结束的时间;Mon May 16 13:08:27 2022主进程执行结束进程已结束,退出代码0
进程池
在利用python进行系统管理的时候,特别是同时操作多个文件目录,或控制多台主机,并进行操作可以节约大量的时间,当操作对象数目不大时,可以利用multiprocessing中的Process动态生成多个进程。
pool可以通过该指定数量的进程,供用户调用,当有新的请求提交到pool中时,如果池还没有满,那么就会创建一个新的进程来执行该请求;但是如果池中的进程数已经达到了规定的最大值,那么该请求就会等待,直到池中有进程结束,才会创建新的进程。Pool([numprocess[,initializer[,initargs]]])
其中numprocess是创建的进程数。如果忽略此参数,将使用cpu_count()的值。Initializer是每个工作进程启动时要执行的可调用对象。Initargs是要传递给initialize的参数元组。Initializer默认为None
Pool实例方法
| 方法 | 描述 |
|---|---|
| apply(func [,args[,kwargs]]) | 在同一个池工作进程中执行函数(args,*kwargs),然后返回结果 |
| apply_async(func[,args[,kwargs[,callback]]]) | 在一个池工作中异步的执行函数args,*kwargs),然后返回结果。此方法的结果是AsyncResult类的实例,稍后可用于获取最终结果。Callback是可调用对象,接受输出参数。当func的结果变为可用时,将立即传递给callback。Callback禁止执行任何阻塞操作,否则将阻塞接收其他异步操作中的结果 |
| close() | 关闭进程池,防止进行进一步操作。如果还有挂起操作,他们将在工作进程终止之前完成 |
| join | 等待所有工作进程退出,此方法只能在close()或者terminate()方法之后调用 |
| imap(func, iterable,[, chunksize]) | map函数版本之一,返回迭代器而非结果列表 |
| imap——unordered( func, iterable[, chunksize]) | 同imap()函数一样,只是结果的顺序根据从工作进程接收到的时间任意确定 |
| map(func, iterable[, chunkusze]) | 将可调用对象func应用给iterable中的所有项,然后以列表的形式返回结果,通过将iterable划分为多块并将工作分派给工作进程,可以并行的执行这项操作。chunksize指定每块中的项数。如果数量较大,可以增大chunksize的值来提升性能 |
| map_async(func, iterable[,chunksize[,callback]]) | 同map()函数,但结果的返回是异步的。返回值是AsyncResult类的实例,稍后可用于获取结果。Callback是指接收一个参数的可调用对象。如果提供callable,当结果变为可用时,将是哦也能够结果调用的callable |
| terminate() | 立即终止所有工作进程,同时不执行任何清理或结束任何挂起工作。如果p被垃圾回收,将自动调用此函数 |
| ready() | 如果调用完成,则返回True |
| successful | 如果调用完成并没有引发异常,返回True,如果就在结果就绪之前调用次方法,将引发AssertionError异常 |
| wait[timeout] | 等待结果变为可用,Timeout是可选的超时 |
注意:apply_async(func [,args[,kwds[,callback]]])它是非阻塞,app(func[,args[,kwds]])是阻塞的
import multiprocessingimport timedef func(msg):print("start",msg)time.sleep(3)print("end",msg)if __name__=="__main__":pool=multiprocessing.Pool(3)for i in range(1,6):msg="任务%d"%ipool.apply_async(func,(msg,))#如果进程池不再接收新的请求 调用closepool.close()#等待子进程结束pool.join()
C:...进程池非阻塞状态的使用.pystart 任务1start 任务2start 任务3end 任务1start 任务4end 任务2start 任务5end 任务3end 任务4end 任务5进程已结束,退出代码0
进程间通信
全局变量在多个进程中不共享,进程之间的数据是独立的,默认情况下互补影响
from multiprocessing import Processnum=10def work1():global numnum+=5print("子进程1运行后num的值",num)def work2():global numnum+=10print("子进程2运行后num的值",num)if __name__=="__main__":print("主进程开始运行")p1=Process(target=work1)p2=Process(target=work2)p1.start()p2.start()p1.join()p2.join()print(num)
C:\User...多个进程之间的数据是否共享.py主进程开始运行子进程1运行后num的值 15子进程2运行后num的值 2010进程已结束,退出代码0
Queue是多个进程安全的队列,可以使用Queue实现多个进程之间的数据传递。put方法用于插入数据到队列中,put方法还有两个可选参数:block和timeout。如果block为True(默认值),并且timeout为正值,该方法会阻塞timeout指定的时间,直到该队列有剩余的空间,如果超时,会抛出Queue.full异常。如果block为False,但该Queue已满,会立即抛出Queue.full异常
get方法可以从队列读取并删除以一个元素。同样,get方法有两个可选参数:blocked和timeout,如果block为True(默认值),并且timeout为正值,那么在等待时间内没有取到任何元素,会抛出Queue.Empty异常。如果block为False,有两种情况存在,如果Queuey有一个值可用,则立即返回该值,否则,如果队列为空,则立即抛出Queue.Empty异常
from multiprocessing import Queueq=Queue(3)#可以指定队列大小,如果不写默认的队列是无限的#向队列中插入元素q.put("消息1")q.put("消息2")q.put("消息3")#put方法中可选参数,block=True,timeout=1,队列已满,等待1s,如果还是没有剩余空间,则跑队列已满的异常q.put("消息4",block=True,timeout=1)print("判断当前队列是否已满",q.full())if not q.full():q.put("消息4",block=True,timeout=1)#读取并删除元素getprint(q.get())print(q.get())print(q.get())if not q.empty():print(q.get(block=True,timeout=1))#查看队列大小print("队列大小",q.qsize())
Queue队列实现进程间通信
from multiprocessing import Process,Queue
from time import sleep
#定义写入的方法
def write(q):
a=['a','b','c','d']
for i in a:
print("开始写入的值:%s"%i)
q.put(i)
sleep(1)
def reader(q):
for i in range(q.qsize()):
print("读取到的值:%s"%q.get())
sleep(1)
if __name__=="__main__":
q=Queue()
pw=Process(target=write,args=(q,))
pr=Process(target=reader,args=(q,))
pw.start()
pw.join()
pr.start()
pr.join()
C:\User...多进程之间通信.py
开始写入的值:a
开始写入的值:b
开始写入的值:c
开始写入的值:d
读取到的值:a
读取到的值:b
读取到的值:c
读取到的值:d
进程已结束,退出代码0
如果使用Pool创建进程,就需要使用multiprocessing.Manager()中的Queue()来完成进程之间的通信,而不是multiprocessing.Queue ,否则会抛出如下异常RuntimeError: Queue objects should only be shared between processes through inheritance
from multiprocessing import Process,Pool,Manager
from time import sleep
#定义写入的方法
def write(q):
a=['a','b','c','d']
for i in a:
print("开始写入的值:%s"%i)
q.put(i)
sleep(1)
def reader(q):
for i in range(q.qsize()):
print("读取到的值:%s"%q.get())
sleep(1)
if __name__=="__main__":
q=Manager().Queue()
pool=Pool(3)
pool.apply(write,(q,))
pool.apply(reader,(q,))
pool.close()
pool.join()
线程
线程也是实现多任务的一种方式,一个线程中,也经常需要同时做多件事,就需要同时运行多个“子任务”,这些子任务就是线程。一个线程可以拥有多个并行的线程,其中每一个线程,共享当前进程的资源
| 区别 | 进程 | 线程 |
|---|---|---|
| 根本区别 | 作为资源分配的单元 | 调度和执行的单位 |
| 开销 | 每一个进程都有独立的代码和数据空间,进程间的切换会有很大的开销 | 线程可以看出是轻量级的进程,多个线程共享内存,线程切换的开销小 |
| 所处环境 | 在操作系统中,同时运行的多个任务 | 在程序中多个顺序流同时执行 |
| 分配内存 | 系统在运行时为每一个进程分配不同的内存区域 | 线程所使用的资源是它所属进程的资源 |
| 包含关系 | 一个进程内可以拥有多个线程 | 线程是进程的一部分,所有线程有时候称为是轻量级的进程 |
进程和线程在使用上各有优缺点,线程执行开销小,但不利于资源的管理和保护,而进程正相反
_thread模块
在python中,可以使用两种方式俩使用线程,使用函数或者使用类包装线程对象。当使用thread模块来处理线程时,可以调用里面的一些函数start_new_thread()来生成一个新的线程。语法格式如下:_thread.start_new_thread[function,args[,kwargs]]
其中function是线程函数,args表示传递线程的参数,它必须是个tuple类型;kwargs是可选参数
import _thread
import time
def fun1():
print("开始运行fun1")
time.sleep(4)
print("运行fun1结束")
def fun2():
print("开始运行fun2")
time.sleep(2)
print("运行fun2结束")
if __name__=="__main__":
print("主程序开始运行")
_thread.start_new_thread(fun1,())
_thread.start_new_thread(fun2,())
time.sleep(7)
C:..._thread实现线程.py
主程序开始运行
开始运行fun2
开始运行fun1
运行fun2结束
运行fun1结束
进程已结束,退出代码0
从程序运行结果可以看出,在fun2函数中调用了sleep函数休眠,当休眠期间,会释放CPU的计算资源,这时fun1抢占了CPU资源开始执行
threading模块
threading模块除了包含_thread模块中的所有方法外,还提供了其他方法:
- threading.currentThread():返回当前的线程变量
- threading.enumerate():返回一个包含正在运行的线程的list。正在运行指线程启动后,结束前,不包括启动前和终止后的线程
- threading。activeCount():返回正在运行的线程数量,与len(threading.enumerate())有相同的结果
threading模块有两种方式来创建线程:
- 通过threading.Thread直接在线程中运行函数
- 通过继承threading.Thread来创建线程
Thread[group=None,target=None,name=None,args=[],keargs={}]
target:要执行的方法;name:线程args/kwargs:要传入方法的参数
Thread类的方法
| 方法名 | 描述 |
|---|---|
| run() | 用以表示线程活动的方法 |
| start() | 启动线程活动 |
| join([time]) | 等待至线程终止。这阻塞调用线程值线程的join()方法被调用终止-正常退出或者抛出处理的异常或者是可选的超时发生 |
| isAlive() | 返回线程是否活动的 |
| getName() | 返回线程名 |
| setName() | 设置线程名 |
import threading
import time
def fun1(thread_name,delay):
print("线程{}开执行fun1".format(thread_name))
time.sleep(delay)
print("线程{}运行fun1结束".format(thread_name))
def fun2(thread_name,delay):
print("线程{}开执行fun2".format(thread_name))
time.sleep(delay)
print("线程{}运行fun2结束".format(thread_name))
if __name__ == '__main__':
print("开始运行")
#创建线程
t1=threading.Thread(target=fun1,args=("thread-1",2))
t2 = threading.Thread(target=fun2, args=("thread-2", 3))
#启动线程
t1.start()
t2.start()
t1.join()
t2.join()
C:\Users.../threading中Thread创建线程.py
开始运行
线程thread-1开执行fun1
线程thread-2开执行fun2
线程thread-1运行fun1结束
线程thread-2运行fun2结束
进程已结束,退出代码0
在python中,通过继承类threading.Thread的方式来创建一个线程。这种方法只要重写类threading.Thread中的方法run(),然后再调用方法start()就能创建线程,并运行方法run()中的代码
import threading
import time
def fun1(delay):
print("线程{}执行fun1".format(threading.current_thread().getName()))
time.sleep(delay)
print("线程{}执行fun1结束".format(threading.current_thread().getName()))
def fun2(delay):
print("线程{}执行fun2".format(threading.current_thread().getName()))
time.sleep(delay)
print("线程{}执行fun2结束".format(threading.current_thread().getName()))
#创建一个类MyThread继承threading.Thread
class MyThread(threading.Thread):
#重新构造方法
def __init__(self,func,name,args):
super().__init__(target=func,name=name,args=args)
def run(self):
self._target(*self._args)
if __name__ == '__main__':
print("开始运行")
t1=MyThread(fun1,"thread-1",(2,))
t2 = MyThread(fun2, "thread-2", (4,))
t1.start()
t2.start()
t1.join()
t2.join()
C:\Use...threading.Tread实现线程.py
开始运行
线程thread-1执行fun1
线程thread-2执行fun2
线程thread-1执行fun1结束
线程thread-2执行fun2结束
进程已结束,退出代码0
线程共享全局变量
在一个进程内所有线程共享全局变量,多个线程之间的数据共享比多进程要好。但可能造成多个进程同时修改一个变量(即线程非安全),可能造成混乱
import time
from threading import Thread
num=10
def test1():
global num
for i in range(3):
num+=1
print("执行test1函数的值",num)
def test2():
print("执行test2函数的值", num)
if __name__ == '__main__':
t1=Thread(target=test1)
t2=Thread(target=test2)
t1.start()
t1.join()
t2.start()
t2.join()
C:.../线程共享全局变量.py
执行test1函数的值 13
执行test2函数的值 13
进程已结束,退出代码0
互斥锁
如果多个线程共同对某个数据修改,则可能出现不可预料的结果,为了保证数据的正确性,需要对多个线程进行同步。最简单的同步机制就是引入互斥锁
锁有两种状态—锁定和未锁定。某个线程要更改共享数据时,先将其锁定,此时资源的状态为“锁定”,其他线程不能更改;直到该线程释放资源,将资源的状态改为“非锁定”状态,其他的线程才能再次锁定该资源。互斥锁保证了每次只有一个线程进行写入操作,从而保证了多线程情况下数据的正确性。
使用Thread对象的Lock可以实现简答的线程同步,有上锁acquire方法和释放release方法,对于那些需要每次只允许一个线程的操作数据,可以将其操作放到acquire和release方法之间
import time
from threading import Thread,Lock
num=0
#创建一个互斥锁
lock=Lock()
def test1():
global num
for i in range(1000000):
lock.acquire() # 上锁
num += 1
lock.release()#释放锁
print("执行test1函数的值", num)
def test2():
global num
for i in range(1000000):
lock.acquire() # 上锁
num+=1
lock.release() # 释放锁
print("执行test2函数的值", num)
if __name__ == '__main__':
t1=Thread(target=test1)
t2=Thread(target=test2)
t1.start()
t1.join()
t2.start()
t2.join()
C:\User...互斥锁的使用.py
执行test1函数的值 1000000
执行test2函数的值 2000000
进程已结束,退出代码0
线程同步的应用
同步就是协同步调,按预定的先后次序运行。如进行、线程同步,可以理解为进程或者为线程A和B一块配合,A执行到一定程度时要依靠B的某个结果,于是停下来,示意B运行,B运行后将结果给A,A继续运行
from threading import Thread,Lock
import time
#创建三把互斥锁
lock1=Lock()
lock2=Lock()
lock3=Lock()
#对lock2和lock3上锁
lock2.acquire()
lock3.acquire()
class Task1(Thread):
def run(self):
while True:
if lock1.acquire():
print("task1")
time.sleep(1)
#释放lock2的锁
lock2.release()
class Task2(Thread):
def run(self):
while True:
if lock2.acquire():
print("task2")
time.sleep(1)
#释放lock3的锁
lock3.release()
class Task3(Thread):
def run(self):
while True:
if lock3.acquire():
print("task3")
time.sleep(1)
#释放lock1的锁
lock1.release()
if __name__ == '__main__':
t1=Task1()
t2 = Task2()
t3 = Task3()
t1.start()
t2.start()
t3.start()
生产者消费者模式
生产者就生产数据的线程,消费者就是消费数据的线程。生产者消费者模式通过一个容器来解决生产者和消费者的枪耦合问题,生产者和消费者之间不通信。生产者生产商品,然后将其放到类似队列的数据结构中,消费者不找生产者要数据,而是直接从队列中取。
from threading import Thread
from queue import Queue
import time
class Producter(Thread):
def run(self):
global queue
count=0
while True:
#判断队列大小
if queue.qsize()<1000:
for i in range(100):
count += 1
msg = "生产第" + str(count) + "个产品"
queue.put(msg)
print(msg)
time.sleep(0.5)
class Consumer(Thread):
def run(self):
global queue
while True:
if queue.qsize()>100:
for i in range(10):
msg=self.name+"消费"+queue.get()
print(msg)
time.sleep(1)
if __name__ == '__main__':
queue=Queue()
p=Producter()
c=Consumer()
p.start()
time.sleep(1)
c.start()
ThreadLocal
它本身是一个全局变量,但是每个线程却可以利用它来保存属于自己的私有数据,这些私有数据对其他线程也是不可见的
import threading
#创建ThreadLocal对象
local=threading.local()
def process_student():
student_name=local.name
print("线程名:%s 学生姓名:%s"%(threading.current_thread().getName(),student_name))
def process_thead(name):
local.name=name
process_student()
t1=threading.Thread(target=process_thead,args=("张三",),name="Thread-A")
t2=threading.Thread(target=process_thead,args=("李四",),name="Thread-B")
t1.start()
t2.start()
t1.join()
t2.join()
C:\Users\.../ThreadLocal的使用.py
线程名:Thread-A 学生姓名:张三
线程名:Thread-B 学生姓名:李四
进程已结束,退出代码0
