网络知识
CS架构与BS架构
Client<===========>Server客户端软件send 服务端软件recv操作系统 操作系统计算机硬件<====物理介质=====>计算机硬件Browser<===========>Server
# 2、网络通信网络存在的意义就是跨地域数据传输=》称之为通信网络=物理链接介质+互联网通信协议# 3、OSI七层协议五层协议应用层传输层网络层数据链路层物理层协议:规定数据的组织格式格式:头部+数据部分封包裹的过程:数据外加头拆包裹的过程:拆掉头获取数据
五层协议
#4、五层协议计算机1: 计算机2:应用层 应用层传输层 传输层网络层 网络层数据链路层 数据链路层物理层 <===========交互机===========> 物理层0101010101010(源mac地址,目标mac地址)(源ip地址,目标ip地址)数据#4.1 物理层负责发送电信号一组物理层数据称之为:位单纯的电信号毫无意义,必须对其进行分组#4.2 数据链路层:ethernet以太网协议规定1:一组数据称之为一个数据帧规定2:数据帧分成两部分=》头+数据头包含:源地址与目标地址,该地址是mac地址数据包含:包含的是网络层发过来的整体的内容规定3:规定但凡接入互联网的主机必须有一块网卡,每块网卡在出厂时都烧制好一个全世界独一无二的地址,该地址称之为mac地址注意:计算机通信基本靠吼,即以太网协议的工作方式是广播(egon,血嫌弃)(帮我买包子)
网络层
#4.3 网络层:IP协议要达到的目的:划分广播域每一个广播域但凡要接通外部,一定要有一个网关帮内部的计算机转发包到公网网关与外界通信走的是路由协议规定1:一组数据称之为一个数据包规定2:数据帧分成两部分=》头+数据头包含:源地址与目标地址,该地址是IP地址数据包含的:传输层发过来的整体的内容ipv4地址:8bit.8bit.8bit.8bit0.0.0.0255.255.255.255子网掩码:8bit.8bit.8bit.8bit255.255.255.0对应的二进制表达11111111.11111111.11111111.00000000一个合法的ipv4地址组成部分=ip地址/子网掩码地址172.16.10.1/255.255.255.0172.16.10.1/24计算机1:172.16.10.1: 10101100.00010000.00001010.000000001255255.255.255.0: 11111111.11111111.11111111.000000000172.16.10.0: 10101100.00010000.00001010.000000000计算机2:172.16.10.2: 10101100.00010000.00001010.000000010255.255.255.255.0: 11111111.11111111.11111111.000000000172.16.10.0: 10101100.00010000.00001010.000000000计算机1: 计算机2:应用层 应用层传输层 传输层网络层 网络层数据链路层 数据链路层物理层 <=========二层交互机========> 物理层0101010101010(源mac地址,xxxx)(源ip地址,目标ip地址)数据(源mac地址,网关的mac地址)(172.16.10.10/24,101.100.200.11/10)数据事先知道的是对方的ip地址但是计算机的底层通信是基于ethernet以太网协议的mac地址通信ARP:所以必须能够将ip地址解析成mac地址# 两台计算机在同一个局域网内计算机1:172.16.10.10/24 直接 计算机2:172.16.10.11/24ARP:自己的ip,对方的ip1、计算二者网络地址,如果一样,拿到计算机2的mac地址就可以了2、发送广播包发送端mac FF:FF:FF:FF:FF:FF 172.16.10.10/24 172.16.10.11/24 数据# 两台计算机不在同一个局域网内计算机1:172.16.10.10/24 网关 计算机2:101.100.200.11/10ARP:自己的ip,对方的ip1、计算机二者网络地址,如果不一样,应该拿到网关的mac地址2、发送广播包发送端mac FF:FF:FF:FF:FF:FF 172.16.10.10/24 172.16.10.1/24 数据#4.3.1 总结******ip地址+mac地址=》标识全世界范围内独一无二的一台计算机或者:ip地址=》标识全世界范围内独一无二的一台计算机
传输层
五层协议计算机1: 计算机2:应用层 应用层socket socket传输层 段 传输层网络层 包 网络层数据链路层 帧 数据链路层物理层 <===========交互机===========> 物理层客户端软件send 服务端软件recv操作系统 操作系统计算机硬件<====物理介质=====>计算机硬件ethernet头+ip头+tcp头+应用层的头+应用层数据#一:传输层 tcp\udp=》基于端口端口范围0-65535,0-1023为系统占用端口ip+port=》标识全世界范围内独一无二的一个基于网络通信的应用程序基于tcp协议通信之前:必须建立一个双向通信的链接C-------------------->SC<--------------------S三次握手建立链接:建立链接是为了传数据做准备的,三次握手即可四次挥手断开链接断开链接时,由于链接内有数据传输,所以必须分四次断开tcp是可靠传输的发送数据必须等到对方确认后才算完成,才会将自己内存中的数据清理掉,否则重传ps:当服务端大量处于TIME_WAIT状态时意味着服务端正在经历高并发tcp协议的半连接池:backlog[链接请求1,链接请求2,链接请求3,链接请求5]#二:应用层:可以自定义协议=》头部+数据部分自定义协议需要注意的问题:1、两大组成部分=头部+数据部分头部:放对数据的描述信息比如:数据要发给谁,数据的类型,数据的长度数据部分:想要发的数据2、头部的长度必须固定因为接收端要通过头部获取所接接收数据的详细信息http https ftpwww.163.com.三:socket介绍import socket
网络编程socket
基于tcp协议的套接字通信
服务端
import socket# 1、买手机phone=socket.socket(socket.AF_INET,socket.SOCK_STREAM) # socket.AF_INET用于跨网络通信,socket.SOCK_STREAM流式协议=》tcp协议# 2、绑定手机卡phone.bind(('127.0.0.1',8081)) # 0-65535, 1024以前的都被系统保留使用# 3、开机phone.listen(5) # 5指的是半连接池的大小print('服务端启动完成,监听地址为:%s:%s' %('127.0.0.1',8080))# 4、等待电话连接请求:拿到电话连接connconn,client_addr=phone.accept() #获取连接对象及连接的ip地址# print(conn)print("客户端的ip和端口:",client_addr)# 5、通信:收\发消息data=conn.recv(1024) # 最大接收的数据量为1024Bytes,收到的是bytes类型print("客户端发来的消息:",data.decode('utf-8'))conn.send(data.upper())# 6、关闭电话连接conn(必选的回收资源的操作)conn.close()# 7、关机(可选操作)phone.close()
客户端
import socket#1、买手机phone=socket.socket(socket.AF_INET,socket.SOCK_STREAM) # socket.AF_INET用于跨网络通信,socket.SOCK_STREAM流式协议=》tcp协议#2、拨通服务端电话phone.connect(('127.0.0.1',8081))#3、通信import timetime.sleep(10)phone.send('hello egon 哈哈哈'.encode('utf-8'))data=phone.recv(1024)print(data.decode('utf-8'))#4、关闭连接(必选的回收资源的操作)phone.close()
服务端加上通信循环
服务端
import socket# 1、买手机phone=socket.socket(socket.AF_INET,socket.SOCK_STREAM) # 流式协议=》tcp协议# 2、绑定手机卡phone.bind(('127.0.0.1',8083)) # 0-65535, 1024以前的都被系统保留使用# 3、开机phone.listen(5) # 5指的是半连接池的大小print('服务端启动完成,监听地址为:%s:%s' %('127.0.0.1',8080))# 4、等待电话连接请求:拿到电话连接connconn,client_addr=phone.accept()# 5、通信循环:收\发消息,只有一层while truewhile True:try:data=conn.recv(1024) # 最大接收的数据量为1024Bytes,收到的是bytes类型if len(data) == 0:# 在unix系统洗,一旦data收到的是空# 意味着是一种异常的行为:客户度非法断开了链接breakprint("客户端发来的消息:",data.decode('utf-8'))conn.send(data.upper())except Exception:# 针对windows系统break# 6、关闭电话连接conn(必选的回收资源的操作)conn.close()# 7、关机(可选操作)phone.close()
客户端
import socket#1、买手机phone=socket.socket(socket.AF_INET,socket.SOCK_STREAM) # 流式协议=》tcp协议#2、拨通服务端电话phone.connect(('127.0.0.1',8083))#3、通信while True:msg=input("输入要发送的消息>>>: ").strip() #msg=''if len(msg) == 0:continuephone.send(msg.encode('utf-8'))print('======?')data=phone.recv(1024)print(data.decode('utf-8'))#4、关闭连接(必选的回收资源的操作)phone.close()
服务端加上链接循环
# 服务端应该满足的特点:# 1、一直提供服务# 2、并发地提供服务import socket# 1、买手机phone=socket.socket(socket.AF_INET,socket.SOCK_STREAM) # 流式协议=》tcp协议# 2、绑定手机卡phone.bind(('127.0.0.1',8080)) # 0-65535, 1024以前的都被系统保留使用# 3、开机phone.listen(5) # 5指的是半连接池的大小print('服务端启动完成,监听地址为:%s:%s' %('127.0.0.1',8080))# 4、等待电话连接请求:拿到电话连接conn# 加上链接循环while True:conn,client_addr=phone.accept() #链接循环,整个处理消息的过程是单线程的。# 5、通信:收\发消息while True:try:data=conn.recv(1024) # 最大接收的数据量为1024Bytes,收到的是bytes类型if len(data) == 0:# 在unix系统洗,一旦data收到的是空# 意味着是一种异常的行为:客户度非法断开了链接breakprint("客户端发来的消息:",data.decode('utf-8'))conn.send(data.upper())except Exception:# 针对windows系统break# 6、关闭电话连接conn(必选的回收资源的操作)conn.close()# 7、关机(可选操作)phone.close()
基于udp协议的套接字通信
服务端
import socketserver=socket.socket(socket.AF_INET,socket.SOCK_DGRAM) # 数据报协议=》udp协议server.bind(('127.0.0.1',8081))while True:data,client_addr=server.recvfrom(1024)server.sendto(data.upper(),client_addr)server.close()
客户端
import socketclient=socket.socket(socket.AF_INET,socket.SOCK_DGRAM) # 流式协议=》tcp协议while True:msg=input('>>>: ').strip()client.sendto(msg.encode('utf-8'),('127.0.0.1',8081))res=client.recvfrom(1024)print(res)client.close()
基于tcp协议远程执行命令
服务端
# 服务端应该满足两个特点:# 1、一直对外提供服务# 2、并发地服务多个客户端import subprocessfrom socket import *server=socket(AF_INET,SOCK_STREAM)server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) #就是它,在bind前加,针对windows系统实现端口重用server.bind(('127.0.0.1',8082))server.listen(5)# 服务端应该做两件事# 第一件事:循环地从板连接池中取出链接请求与其建立双向链接,拿到链接对象while True:conn,client_addr=server.accept()# 第二件事:拿到链接对象,与其进行通信循环while True:try:cmd=conn.recv(1024) #当发送的字节数超过1024会产生粘包问题if len(cmd) == 0:break #客户端发送cmd命令给服务端执行obj=subprocess.Popen(cmd.decode('utf-8'), #开启子进程执行命令shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE)stdout_res=obj.stdout.read()stderr_res=obj.stderr.read()print(len(stdout_res)+len(stderr_res))# conn.send(stdout_res+stderr_res) # ???conn.send(stdout_res)conn.send(stderr_res)# with open("1.mp4",mode='rb') as f:# for line in f:# conn.send(line)except Exception:breakconn.close()
客户端
from socket import *client=socket(AF_INET,SOCK_STREAM)client.connect(('127.0.0.1',8082))while True:cmd=input('请输入命令>>:').strip()if len(cmd) == 0:continueclient.send(cmd.encode('utf-8'))# 解决粘包问题思路:# 1、拿到数据的总大小total_size# 2、recv_size=0,循环接收,每接收一次,recv_size+=接收的长度# 3、直到recv_size=total_size,结束循环cmd_res=client.recv(1024) # 本次接收,最大接收1024Bytesprint(cmd_res.decode('utf-8')) # 强调:windows系统用gbk# 粘包问题出现的原因# 1、tcp是流式协议,数据像水流一样粘在一起,没有任何边界区分# 2、收数据没收干净,有残留,就会下一次结果混淆在一起# 解决的核心法门就是:每次都收干净,不要任何残留
udp协议没有粘包问题
服务端
import socketserver=socket.socket(socket.AF_INET,socket.SOCK_DGRAM)server.bind(('127.0.0.1',8080))res1=server.recvfrom(2) # b"hello"print(res1)res2=server.recvfrom(3) # b"world"print(res2)server.close()
客户端
import socketclient=socket.socket(socket.AF_INET,socket.SOCK_DGRAM)client.sendto(b'hello',('127.0.0.1',8080))client.sendto(b'world',('127.0.0.1',8080))client.close()
解决粘包问题
服务端
# 服务端应该满足两个特点:# 1、一直对外提供服务# 2、并发地服务多个客户端import subprocessimport structfrom socket import *server=socket(AF_INET,SOCK_STREAM)server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) #就是它,在bind前加server.bind(('127.0.0.1',8083))server.listen(5)# 服务端应该做两件事# 第一件事:循环地从板连接池中取出链接请求与其建立双向链接,拿到链接对象while True:conn,client_addr=server.accept()# 第二件事:拿到链接对象,与其进行通信循环while True:try:cmd=conn.recv(1024)if len(cmd) == 0:breakobj=subprocess.Popen(cmd.decode('utf-8'),shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE)stdout_res=obj.stdout.read()stderr_res=obj.stderr.read()total_size=len(stdout_res)+len(stderr_res)# 1、先发头信息(固定长度的bytes):对数据描述信息# int->固定长度的bytesheader=struct.pack('i',total_size) #将数据打包成整数类型二进制conn.send(header) #发送总数据长度的二进制格式# 2、再发真实的数据conn.send(stdout_res) #将执行结果发送给客户端conn.send(stderr_res) #将执行错误信息发送给客户端except Exception:breakconn.close()
客户端
import structfrom socket import *client=socket(AF_INET,SOCK_STREAM)client.connect(('127.0.0.1',8083))while True:cmd=input('请输入命令>>:').strip()if len(cmd) == 0:continueclient.send(cmd.encode('utf-8'))# 解决粘包问题思路:# 一、先收固定长度的头:解析出数据的描述信息,包括数据的总大小total_sizeheader=client.recv(4)total_size=struct.unpack('i',header)[0] #i代表interger整数类型# 二、根据解析出的描述信息,接收真实的数据# 2、recv_size=0,循环接收,每接收一次,recv_size+=接收的长度# 3、直到recv_size=total_size,结束循环recv_size = 0while recv_size < total_size:recv_data=client.recv(1024)recv_size+=len(recv_data)print(recv_data.decode('utf-8'),end='')else:print()# 粘包问题出现的原因# 1、tcp是流式协议,数据像水流一样粘在一起,没有任何边界区分# 2、收数据没收干净,有残留,就会下一次结果混淆在一起# 解决的核心法门就是:每次都收干净,不要任何残留
解决粘包问题(终极版)
struct.pack
res=struct.pack(‘i’,999999999) #对999999999以内数字封包都是4个字节长度。
import jsonimport structheader_dic={"filename":"a.txt","total_size":12312312122222222222222222222222223123123123123121222222222222222222222222231231231231231212222222222222222222222222312312312312312122222222222222222222222223123123123123121222222222222222222222222231231231231231212222222222222222222222222312312312312312122222222222222222222222223123123123123121222222222222222222222222231231231231231212222222222222222222222222312312312312312122222222222222222222222223123123123123121222222222222222222222222231231231231231212222222222222222222222222312312312312312122222222222222222222222223123123123123121222222222222222222222222231231231231231212222222222222222222222222312312312311231231212222222222222222222222222312312312312312122222222222222222222222223123123123123121222222222222222222222222231231231231231212222222222222222222222222312312312312312122222222222222222222222223123123123123121222222222222222222222222231231231231231212222222222222222222222222312312312312312122222222222222222222222223123123123123121222222222222222222222222231231231231231212222222222222222222222222312312312312312122222222222222222222222223123123123123121222222222222222222222222231231231231231212222222222222222222222222312312312312312122222222222222222222222223123123123123121222222222222222222222222231231231231231212222222222222222222222222312312312312312122222222222222222222222223123123123123121222222222222222222222222231231231231231212222222222222222222222222312312312312312122222222222222222222222223123123123123121222222222222222222222222231231232312122222222222222222222222223123123,"md5":"123123xi12ix12","asf":"123123xi12ix12","asf1":"123123xi12ix12","asf2":"123123xi12ix12","asf3":"123123xi12ix12",}json_str=json.dumps(header_dic)json_str_bytes=json_str.encode('utf-8')print(len(json_str_bytes))res=struct.pack('i',999999999) #对999999999以内数字封包都是4个字节长度。print(res,len(res))res=struct.pack('i',len(json_str_bytes))
服务端
# 服务端应该满足两个特点:# 1、一直对外提供服务# 2、并发地服务多个客户端import subprocessimport structimport jsonfrom socket import *server=socket(AF_INET,SOCK_STREAM)server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) #就是它,在bind前加server.bind(('127.0.0.1',8083))server.listen(5)# 服务端应该做两件事# 第一件事:循环地从板连接池中取出链接请求与其建立双向链接,拿到链接对象while True:conn,client_addr=server.accept()# 第二件事:拿到链接对象,与其进行通信循环while True:try:cmd=conn.recv(1024)if len(cmd) == 0:breakobj=subprocess.Popen(cmd.decode('utf-8'),shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE)stdout_res=obj.stdout.read()stderr_res=obj.stderr.read()total_size=len(stdout_res)+len(stderr_res)# 1、制作头header_dic={"filename":"a.txt","total_size":total_size,"md5":"123123xi12ix12"}json_str = json.dumps(header_dic)json_str_bytes = json_str.encode('utf-8')# 2、先把头的长度发过去x=struct.pack('i',len(json_str_bytes))conn.send(x)# 3、发头信息conn.send(json_str_bytes)# 4、再发真实的数据conn.send(stdout_res)conn.send(stderr_res)except Exception:breakconn.close()
客户端
import structimport jsonfrom socket import *client=socket(AF_INET,SOCK_STREAM)client.connect(('127.0.0.1',8083))while True:cmd=input('请输入命令>>:').strip()if len(cmd) == 0:continueclient.send(cmd.encode('utf-8'))# 接收端# 1、先手4个字节,从中提取接下来要收的头的长度x=client.recv(4)header_len=struct.unpack('i',x)[0]# 2、接收头,并解析json_str_bytes=client.recv(header_len)json_str=json_str_bytes.decode('utf-8')header_dic=json.loads(json_str)print(header_dic)total_size=header_dic["total_size"]# 3、接收真实的数据recv_size = 0while recv_size < total_size:recv_data=client.recv(1024)recv_size+=len(recv_data)print(recv_data.decode('utf-8'),end='')else:print()# 粘包问题出现的原因# 1、tcp是流式协议,数据像水流一样粘在一起,没有任何边界区分# 2、收数据没收干净,有残留,就会下一次结果混淆在一起# 解决的核心法门就是:每次都收干净,不要任何残留
socketserver模块
基于tcp协议的使用
服务端、支持并发
import socketserverclass MyRequestHandle(socketserver.BaseRequestHandler):def handle(self):# 如果tcp协议,self.request=>connprint(self.client_address)while True:try:msg = self.request.recv(1024)if len(msg) == 0: breakself.request.send(msg.upper())except Exception:breakself.request.close()# 服务端应该做两件事# 第一件事:循环地从半连接池中取出链接请求与其建立双向链接,拿到链接对象# 实例化得到一个tcp连接的对象,Threading意思是,只要来了请求,它自动开一个线程来处理并进行交互# 第一个参数是绑定的地址,第二个参数的传一个类s=socketserver.ThreadingTCPServer(('127.0.0.1',8889),MyRequestHandle)s.serve_forever()# 等同于# while True:# conn,client_addr=server.accept()# 启动一个线程(conn,client_addr)# 第二件事:拿到链接对象,与其进行通信循环===>handle
客户端
from socket import *client=socket(AF_INET,SOCK_STREAM)client.connect(('127.0.0.1',8889))while True:msg=input('请输入命令>>:').strip()if len(msg) == 0:continueclient.send(msg.encode('utf-8'))res=client.recv(1024)print(res.decode('utf-8'))
基于udp协议
服务端
import socketserverclass MyRequestHanlde(socketserver.BaseRequestHandler):def handle(self):client_data=self.request[0]server=self.request[1]client_address=self.client_addressprint('客户端发来的数据%s' %client_data)server.sendto(client_data.upper(),client_address)s=socketserver.ThreadingUDPServer(("127.0.0.1",8888),MyRequestHanlde)s.serve_forever()# 相当于:只负责循环地收# while True:# data,client_addr=server.recvfrom(1024)# 启动一个线程处理后续的事情(data,client_addr)
客户端
import socketclient=socket.socket(socket.AF_INET,socket.SOCK_DGRAM) # 流式协议=》tcp协议while True:msg=input('>>>: ').strip()client.sendto(msg.encode('utf-8'),('115.29.65.16',8888))res=client.recvfrom(1024)print(res)client.close()
进程
同步异步、阻塞与非阻塞
切换(CPU)分为两种情况1.当一个程序遇到IO操作的时候,操作系统会剥夺该程序的CPU执行权限作用:提高了CPU的利用率 并且也不影响程序的执行效率2.当一个程序长时间占用CPU的时候,操作吸引也会剥夺该程序的CPU执行权限弊端:降低了程序的执行效率(原本时间+切换时间)同步和异步"""描述的是任务的提交方式"""同步:任务提交之后,原地等待任务的返回结果,等待的过程中不做任何事(干等)程序层面上表现出来的感觉就是卡住了异步:任务提交之后,不原地等待任务的返回结果,直接去做其他事情我提交的任务结果如何获取?任务的返回结果会有一个异步回调机制自动处理阻塞非阻塞"""描述的程序的运行状态"""阻塞:阻塞态非阻塞:就绪态、运行态理想状态:我们应该让我们的写的代码永远处于就绪态和运行态之间切换
同步代码
import timedef func():time.sleep(3)print('hello world')if __name__ == '__main__':res = func() # 同步调用print('hahaha')
开启进程的二种方式
第一种直接使用multiprocessing的process创建对象
第二种使用类继承process类,实现run方法
# 第一种(直接使用)from multiprocessing import Processimport timedef task(name):print('%s is running'%name)time.sleep(3)print('%s is over'%name)if __name__ == '__main__':# 1 创建一个对象p = Process(target=task, args=('jason',)) #传入需要执行的函数,及函数参数# 容器类型哪怕里面只有1个元素 建议要用逗号隔开# 2 开启进程p.start() # 告诉操作系统帮你创建一个进程 异步print('主')"""windows操作系统下 创建进程一定要在main内创建因为windows下创建进程类似于模块导入的方式会从上往下依次执行代码linux中则是直接将代码完整的拷贝一份"""# 第二种方式 类的继承from multiprocessing import Processimport timeclass MyProcess(Process): #继承process方法def run(self): #需要实现run方法print('hello bf girl')time.sleep(1)print('get out!')if __name__ == '__main__':p = MyProcess()p.start()print('主')
join方法
start方法:开启p进程
join方法:阻塞创建的p进程运行,主进程等待子进程p运行结束后再继续往后执行。
from multiprocessing import Processimport timedef task(name, n):print('%s is running'%name)time.sleep(n)print('%s is over'%name)if __name__ == '__main__':# p1 = Process(target=task, args=('jason', 1))# p2 = Process(target=task, args=('egon', 2))# p3 = Process(target=task, args=('tank', 3))# start_time = time.time()# p1.start()# p2.start()# p3.start() # 仅仅是告诉操作系统要创建进程# # time.sleep(50000000000000000000)# # p.join() # 主进程等待子进程p运行结束之后再继续往后执行# p1.join()# p2.join()# p3.join()start_time = time.time()p_list = []for i in range(1, 4):p = Process(target=task, args=('子进程%s'%i, i))p.start()p_list.append(p)for p in p_list:p.join()print('主', time.time() - start_time)
进程间数据隔离
进程间数据并不共享
创建进程就是在内存中申请一块内存空间将需要运行的代码丢进去一个进程对应在内存中就是一块独立的内存空间多个进程对应在内存中就是多块独立的内存空间进程与进程之间数据默认情况下是无法直接交互,如果想交互可以借助于第三方工具、模块
from multiprocessing import Processmoney = 100def task():global money # 局部修改全局money = 666print('子',money)if __name__ == '__main__':p = Process(target=task)p.start()p.join()print(money)打印结果:子 666100
进程对象及其他方法
os.getppid() 查看当前父进程的进程号
os.getpid() 查看当前进程号
current_process().pid # 查看当前进程的进程号
p.terminate() # 杀死当前进程
是告诉操作系统帮你去杀死当前进程 但是需要一定的时间 而代码的运行速度极快
p.is_alive() # 判断当前进程是否存活
from multiprocessing import Process, current_processimport timeimport osdef task():# print('%s is running'%current_process().pid) # 查看当前进程的进程号print('%s is running'%os.getpid()) # 查看当前进程的进程号# print('子进程的主进程号%s'%os.getppid()) # 查看当前进程的进程号time.sleep(30)if __name__ == '__main__':p = Process(target=task)p.start()p.terminate() # 杀死当前进程# 是告诉操作系统帮你去杀死当前进程 但是需要一定的时间 而代码的运行速度极快time.sleep(0.1)print(p.is_alive()) # 判断当前进程是否存活"""一般情况下我们会默认将存储布尔值的变量名和返回的结果是布尔值的方法名都起成以is_开头"""print('主')# print('主',current_process().pid)# print('主',os.getpid())# print('主主',os.getppid()) # 获取父进程的pid号
僵尸与孤儿进程
# 僵尸进程"""死了但是没有死透当你开设了子进程之后 该进程死后不会立刻释放占用的进程号因为我要让父进程能够查看到它开设的子进程的一些基本信息 占用的pid号 运行时间。。。所有的进程都会步入僵尸进程父进程不死并且在无限制的创建子进程并且子进程也不结束回收子进程占用的pid号父进程等待子进程运行结束父进程调用join方法"""# 孤儿进程"""子进程存活,父进程意外死亡操作系统会开设一个“儿童福利院”专门管理孤儿进程回收相关资源"""
from multiprocessing import Processimport timedef run():print('hello world')time.sleep(3)print('get out')if __name__ == '__main__':p = Process(target=run)p.start()print('主')
守护进程
1.1、什么是守护进程?
**1、*_守护进程会在主进程代码运行结束的情况下,立即挂掉。*_****2、守护进程本身就是一个子进程。****3、主进程在其代码结束后就已经算运行完毕了(守护进程在此时就被回收),然后主进程会一直等非守护的子进程都运行完毕后回收子进程的资源(否则会产生僵尸进程),才会结束,**
p.daemon = True # 将进程p设置成守护进程 这一句一定要放在start方法上面才有效否则会直接报错
p.start()
from multiprocessing import Processimport timedef task(name):print('%s总管正在活着'% name)time.sleep(3)print('%s总管正在死亡' % name)if __name__ == '__main__':p = Process(target=task,args=('egon',))# p = Process(target=task,kwargs={'name':'egon'})p.daemon = True # 将进程p设置成守护进程 这一句一定要放在start方法上面才有效否则会直接报错p.start()print('皇帝jason寿终正寝')
互斥锁
from multiprocessing import Process, Lock
多个进程操作同一份数据的时候,会出现数据错乱的问题
针对上述问题,解决方式就是加锁处理:将并发变成串行,牺牲效率但是保证了数据的安全
mutex = Lock()
mutex.acquire() #进程抢锁
干活,操作同一个数据
mutex.release() #进程释放锁
注意:1.锁不要轻易的使用,容易造成死锁现象(我们写代码一般不会用到,都是内部封装好的)2.锁只在处理数据的部分加来保证数据安全(只在争抢数据的环节加锁处理即可)
from multiprocessing import Process, Lockimport jsonimport timeimport random# 查票def search(i):# 文件操作读取票数with open('data', 'r', encoding='utf8') as f:dic = json.load(f)print('用户%s查询余票:%s'%(i, dic.get('ticket_num')))# 字典取值不要用[]的形式 推荐使用get 你写的代码打死都不能报错!!!# 买票 1.先查 2.再买def buy(i):# 先查票with open('data', 'r', encoding='utf8') as f:dic = json.load(f)# 模拟网络延迟time.sleep(random.randint(1,3))# 判断当前是否有票if dic.get('ticket_num') > 0:# 修改数据库 买票dic['ticket_num'] -= 1# 写入数据库with open('data', 'w', encoding='utf8') as f:json.dump(dic,f)print('用户%s买票成功'%i)else:print('用户%s买票失败'%i)# 整合上面两个函数def run(i, mutex):search(i)# 给买票环节加锁处理# 抢锁mutex.acquire()buy(i)# 释放锁mutex.release()if __name__ == '__main__':# 在主进程中生成一把锁 让所有的子进程抢 谁先抢到谁先买票mutex = Lock()for i in range(1,11):p = Process(target=run, args=(i, mutex))p.start()
进程队列 Queue
队列:先进先出堆栈:先进后出
from multiprocessing import Queueq = Queue() # 括号内可以放数字来限制队列的大小q.put() # 放数据 当队列满了再放 阻塞q.get() # 取数据 当队列空了再取 阻塞q.full() # 判断队列是否满了q.empty() # 判断队列是否空了q.get_nowait() # 取数据的时候如果没有数据直接报错q.get(timeout=5) # 取数据的时候如果没有数据等5s还没有则直接报错
try:v6 = q.get(timeout=3)print(v6)except Exception as e:print('一滴都没有了!')
from multiprocessing import Queue# 创建一个队列q = Queue(5) # 括号内可以传数字 标示生成的队列最大可以同时存放的数据量# 往队列中存数据q.put(111)q.put(222)q.put(333)# print(q.full()) # 判断当前队列是否满了# print(q.empty()) # 判断当前队列是否空了q.put(444)q.put(555)# print(q.full()) # 判断当前队列是否满了# q.put(666) # 当队列数据放满了之后 如果还有数据要放程序会阻塞 直到有位置让出来 不会报错"""存取数据 存是为了更好的取千方百计的存、简单快捷的取同在一个屋檐下差距为何那么大"""# 去队列中取数据v1 = q.get()v2 = q.get()v3 = q.get()v4 = q.get()v5 = q.get()# print(q.empty())# V6 = q.get_nowait() # 没有数据直接报错queue.Empty# v6 = q.get(timeout=3) # 没有数据之后原地等待三秒之后再报错 queue.Emptytry:v6 = q.get(timeout=3)print(v6)except Exception as e:print('一滴都没有了!')# # v6 = q.get() # 队列中如果已经没有数据的话 get方法会原地阻塞# print(v1, v2, v3, v4, v5, v6)"""q.full()q.empty()q.get_nowait()在多进程的情况下是不精确"""
IPC机制-进程+队列
进程间通信
"""进程之间是无法直接进行数据交互的,但是可以通过队列或者管道实现数据交互管道:队列:管道+锁本地测试的时候才可能会用到Queue,实际生产用的都是别人封装好的功能非常强大的工具rediskafkaRQ"""
from multiprocessing import Queue, Process"""研究思路1.主进程跟子进程借助于队列通信2.子进程跟子进程借助于队列通信"""def producer(q):q.put('我是23号技师 很高兴为您服务')def consumer(q):print(q.get())if __name__ == '__main__':q = Queue() #创建一个队列p = Process(target=producer,args=(q,)) #将q队列传入,调用put方法p1 = Process(target=consumer,args=(q,)) #将q队列传入,调用get方法p.start()p1.start()
生产消费者模型
"""生产者:生产/制造东西的消费者:消费/处理东西的该模型除了上述两个之外还需要一个媒介生活中的例子做包子的将包子做好后放在蒸笼(媒介)里面,买包子的取蒸笼里面拿厨师做菜做完之后用盘子装着给你消费者端过去生产者和消费者之间不是直接做交互的,而是借助于媒介做交互生产者(做包子的) + 消息队列(蒸笼) + 消费者(吃包子的)"""
from multiprocessing import Process, Queue, JoinableQueueimport timeimport randomdef producer(name,food,q):for i in range(5):data = '%s生产了%s%s'%(name,food,i)# 模拟延迟time.sleep(random.randint(1,3))print(data)# 将数据放入 队列中q.put(data)def consumer(name,q):# 消费者胃口很大 光盘行动while True:food = q.get() # 没有数据就会卡住# 判断当前是否有结束的标识# if food is None:breaktime.sleep(random.randint(1,3))print('%s吃了%s'%(name,food))q.task_done() # 告诉队列你已经从里面取出了一个数据并且处理完毕了if __name__ == '__main__':# q = Queue()q = JoinableQueue() #每当你往该队列中存入数据的时候 内部会有一个计数器+1p1 = Process(target=producer,args=('大厨egon','包子',q))p2 = Process(target=producer,args=('马叉虫tank','泔水',q))#消费者c1 = Process(target=consumer,args=('春哥',q))c2 = Process(target=consumer,args=('新哥',q))p1.start()p2.start()# 将消费者设置成守护进程c1.daemon = Truec2.daemon = Truec1.start() #开始消费c2.start()p1.join()p2.join()# 等待生产者生产完毕之后 往队列中添加特定的结束符号# q.put(None) # 肯定在所有生产者生产的数据的末尾q.join() # 等待队列中所有的数据被取完再执行往下执行代码"""JoinableQueue 每当你往该队列中存入数据的时候 内部会有一个计数器+1没当你调用task_done的时候 计数器-1q.join() 当计数器为0的时候 才往后运行"""# 只要q.join执行完毕 说明消费者已经处理完数据了 消费者就没有存在的必要了
线程
致命三问
- 什么是线程 ```css “”” 进程:资源单位 线程:执行单位
将操作系统比喻成一个大的工厂 那么进程就相当于工厂里面的车间 而线程就是车间里面的流水线
每一个进程肯定自带一个线程
再次总结: 进程:资源单位(起一个进程仅仅只是在内存空间中开辟一块独立的空间) 线程:执行单位(真正被cpu执行的其实是进程里面的线程,线程指的就是代码的执行过程,执行代码中所需要使用到的资源都找所在的进程索要)
进程和线程都是虚拟单位,只是为了我们更加方便的描述问题 “””
-为何要有线程```python"""开设进程1.申请内存空间 耗资源2.“拷贝代码” 耗资源开线程一个进程内可以开设多个线程,在用一个进程内开设多个线程无需再次申请内存空间操作总结:开设线程的开销要远远的小于进程的开销同一个进程下的多个线程数据是共享的!!!"""我们要开发一款文本编辑器获取用户输入的功能实时展示到屏幕的功能自动保存到硬盘的功能针对上面这三个功能,开设进程还是线程合适???开三个线程处理上面的三个功能更加的合理
- 如何使用
"""进程:资源单位线程:执行单位线程才是真正干活的人,干活的过程中需要的资源由线程所在的进程提供每一个进程肯定都自带一个线程同一个进程内可以创建多个线程""""""开进程申请内存空间”拷贝代码“消耗资源较大开线程同一个进程内创建多个线程 无需上述两部操作,消耗资源相对较小智商情商搜商"""
今日内容概要
- 开启线程的两种方式
- TCP服务端实现并发的效果
- 线程对象的join方法
- 线程间数据共享
- 线程对象属性及其他方法
- 守护线程
- 线程互斥锁
- GIL全局解释器锁
- 多进程与多线程的实际应用场景
开启线程的两种方式
与开启进程方式类似,直接使用thread创建或者类继承
# from multiprocessing import Process# from threading import Thread# import time### def task(name):# print('%s is running'%name)# time.sleep(1)# print('%s is over'%name)### # 开启线程不需要在main下面执行代码 直接书写就可以# # 但是我们还是习惯性的将启动命令写在main下面# t = Thread(target=task,args=('egon',))# # p = Process(target=task,args=('jason',))# # p.start()# t.start() # 创建线程的开销非常小 几乎是代码一执行线程就已经创建了# print('主')from threading import Threadimport timeclass MyThead(Thread):def __init__(self, name):"""针对刷个下划线开头双下滑线结尾(__init__)的方法 统一读成 双下init"""# 重写了别人的方法 又不知道别人的方法里有啥 你就调用父类的方法super().__init__()self.name = namedef run(self):print('%s is running'%self.name)time.sleep(1)print('egon DSB')if __name__ == '__main__':t = MyThead('egon')t.start()print('主')
TCP服务端实现并发
import socketfrom threading import Threadfrom multiprocessing import Process"""服务端1.要有固定的IP和PORT2.24小时不间断提供服务3.能够支持并发从现在开始要养成一个看源码的习惯我们前期要立志称为拷贝忍者 卡卡西 不需要有任何的创新等你拷贝到一定程度了 就可以开发自己的思想了"""server =socket.socket() # 括号内不加参数默认就是TCP协议server.bind(('127.0.0.1',8080))server.listen(5)# 将服务的代码单独封装成一个函数def talk(conn):# 通信循环while True:try:data = conn.recv(1024)# 针对mac linux 客户端断开链接后if len(data) == 0: breakprint(data.decode('utf-8'))conn.send(data.upper())except ConnectionResetError as e:print(e)breakconn.close()# 链接循环while True:conn, addr = server.accept() # 接客# 叫其他人来服务客户# t = Thread(target=talk,args=(conn,))t = Process(target=talk,args=(conn,))t.start()"""客户端"""import socketclient = socket.socket()client.connect(('127.0.0.1',8080))while True:client.send(b'hello world')data = client.recv(1024)print(data.decode('utf-8'))
线程对象的join方法
from threading import Threadimport timedef task(name):print('%s is running'%name)time.sleep(3)print('%s is over'%name)if __name__ == '__main__':t = Thread(target=task,args=('egon',))t.start()t.join() # 主线程等待子线程运行结束再执行print('主')
同一个进程下的多个线程数据是共享的
from threading import Threadimport timemoney = 100def task():global moneymoney = 666print(money)if __name__ == '__main__':t = Thread(target=task)t.start()t.join()print(money)
线程对象属性及其他方法
print('主',active_count()) # 统计当前正在活跃的线程数print('主',os.getpid())print('主',current_thread().name) # 获取线程名字
from threading import Thread, active_count, current_threadimport os,timedef task(n):# print('hello world',os.getpid())print('hello world',current_thread().name)time.sleep(n)if __name__ == '__main__':t = Thread(target=task,args=(1,))t1 = Thread(target=task,args=(2,))t.start()t1.start()t.join()print('主',active_count()) # 统计当前正在活跃的线程数# print('主',os.getpid())# print('主',current_thread().name) # 获取线程名字
守护线程
2.1、什么是守护线程?
**1、*_守护线程会在"该进程内所有非守护线程全部都运行完毕后,守护线程才会挂掉"。并不是主线程运行完毕后守护线程挂掉。这一点是和守护进程的区别之处!*_****_*2、*_*_守护线程守护的是:当前进程内所有的子线程!*_<br />
**
**3、主线程在其他非守护线程运行完毕后才算运行完毕(守护线程在此时就被回收)。因为主线程的结束意味着进程的结束,进程整体的资源都将被回收,而进程必须保证非守护线程都运行完毕后才能结束。**
t = Thread(target=task,args=(‘egon’,))
t.daemon = True
1.例子:当只有一个子线程并且为守护线程,那么这个守护线程就会等待主线程运行完毕后挂掉# from threading import Thread# import time### def task(name):# print('%s is running'%name)# time.sleep(1)# print('%s is over'%name)### if __name__ == '__main__':# t = Thread(target=task,args=('egon',))# t.daemon = True# t.start()# print('主')"""主线程运行结束之后不会立刻结束 会等待所有其他非守护线程结束才会结束因为主线程的结束意味着所在的进程的结束"""2.例子:当有多个子线程时,守护线程就会等待所有的子线程运行完毕后,守护线程才会挂掉(这一点和主线程是一样的,都是等待所有的子线程运行完毕后才会挂掉)。from threading import Threadimport timedef foo():print(123)time.sleep(1)print('end123')def func():print(456)time.sleep(3)print('end456')if __name__ == '__main__':t1 = Thread(target=foo)t2 = Thread(target=func)t1.daemon = Truet1.start()t2.start()print('主.......')
线程互斥锁
与进程类似
from threading import Thread,Lockimport timemoney = 100mutex = Lock()def task():global moneymutex.acquire()tmp = moneytime.sleep(0.1)money = tmp - 1mutex.release()if __name__ == '__main__':t_list = []for i in range(100):t = Thread(target=task)t.start()t_list.append(t)for t in t_list:t.join()print(money)
GIL全局解释器锁
Ps:博客园密码:xiaoyuanqujing@666
"""In CPython, the global interpreter lock, or GIL, is a mutex that prevents multiplenative threads from executing Python bytecodes at once. This lock is necessary mainlybecause CPython’s memory management is not thread-safe. (However, since the GILexists, other features have grown to depend on the guarantees that it enforces.)""""""python解释器其实有多个版本CpythonJpythonPypypython但是普遍使用的都是CPython解释器在CPython解释器中GIL是一把互斥锁,用来阻止同一个进程下的多个线程的同时执行同一个进程下的多个线程无法利用多核优势!!!疑问:python的多线程是不是一点用都没有???无法利用多核优势因为cpython中的内存管理不是线程安全的内存管理(垃圾回收机制)1.应用计数2.标记清楚3.分代回收""""""重点:1.GIL不是python的特点而是CPython解释器的特点2.GIL是保证解释器级别的数据的安全3.GIL会导致同一个进程下的多个线程的无法同时执行即无法利用多核优势(******)4.针对不同的数据还是需要加不同的锁处理5.解释型语言的通病:同一个进程下多个线程无法利用多核优势"""
GIL与普通互斥锁的区别
from threading import Thread,Lockimport timemutex = Lock()money = 100def task():global money# with mutex:# tmp = money# time.sleep(0.1)# money = tmp -1mutex.acquire()tmp = moneytime.sleep(0.1) # 只要你进入IO了 GIL会自动释放,但是别人没拿到money锁,gil会回来money = tmp - 1mutex.release()if __name__ == '__main__':t_list = []for i in range(100):t = Thread(target=task)t.start()t_list.append(t)for t in t_list:t.join()print(money)"""100个线程起起来之后 要先去抢GIL我进入io GIL自动释放 但是我手上还有一个自己的互斥锁其他线程虽然抢到了GIL但是抢不到互斥锁最终GIL还是回到你的手上 你去操作数据"""
同一个进程下的多线程无法利用多核优势,是不是就没有用了
"""多线程是否有用要看具体情况单核:四个任务(IO密集型\计算密集型)多核:四个任务(IO密集型\计算密集型)"""# 计算密集型 每个任务都需要10s单核(不用考虑了)多进程:额外的消耗资源多线程:介绍开销多核多进程:总耗时 10+多线程:总耗时 40+# IO密集型多核多进程:相对浪费资源多线程:更加节省资源
代码验证-针对io\cpu密集
# 计算密集型# from multiprocessing import Process# from threading import Thread# import os,time### def work():# res = 0# for i in range(10000000):# res *= i## if __name__ == '__main__':# l = []# print(os.cpu_count()) # 获取当前计算机CPU个数# start_time = time.time()# for i in range(12):# p = Process(target=work) # 1.4679949283599854# t = Thread(target=work) # 5.698534250259399# t.start()# # p.start()# # l.append(p)# l.append(t)# for p in l:# p.join()# print(time.time()-start_time)# IO密集型from multiprocessing import Processfrom threading import Threadimport os,timedef work():time.sleep(2)if __name__ == '__main__':l = []print(os.cpu_count()) # 获取当前计算机CPU个数start_time = time.time()for i in range(4000):# p = Process(target=work) # 21.149890184402466t = Thread(target=work) # 3.007986068725586t.start()# p.start()# l.append(p)l.append(t)for p in l:p.join()print(time.time()-start_time)
总结
"""多进程和多线程都有各自的优势并且我们后面在写项目的时候通常可以多进程下面再开设多线程这样的话既可以利用多核也可以介绍资源消耗"""
TCP服务端并发
简单实现多线程tcp
import socketfrom threading import Threaddef communication(conn):while True:try:data = conn.recv(1024)if len(data) == 0: breakconn.send(data.upper())except ConnectionResetError as e:print(e)breakconn.close()def server(ip, port):server = socket.socket()server.bind((ip, port))server.listen(5)while True:conn, addr = server.accept()# 开设多进程或者多线程处理客户端通信t = Thread(target=communication, args=(conn,)) #没接收到一个连接则创建一个线程t.start()if __name__ == '__main__':s = Thread(target=server, args=('127.0.0.1', 8080))s.start()
今日内容概要
- 死锁与递归锁(了解)
- 信号量(了解)
- Event事件(了解)
- 线程q(了解)
- 进程池与线程池(掌握)
- 协程(了解)
- 协程实现TCP服务端的并发效果(了解)
死锁
当你知道锁的使用抢锁必须要释放锁,其实你在操作锁的时候也极其容易产生死锁现象(整个程序卡死 阻塞)
from threading import Thread, Lockimport timemutexA = Lock()mutexB = Lock()# 类只要加括号多次 产生的肯定是不同的对象# 如果你想要实现多次加括号等到的是相同的对象 单例模式class MyThead(Thread):def run(self):self.func1()self.func2()def func1(self):mutexA.acquire() #抢A锁print('%s 抢到A锁'% self.name) # 获取当前线程名mutexB.acquire() #等func2释放B锁,再抢,阻塞在这里print('%s 抢到B锁'% self.name)mutexB.release()mutexA.release()def func2(self):mutexB.acquire() #抢B锁print('%s 抢到B锁'% self.name)time.sleep(2)mutexA.acquire() #等func1释放A锁,再抢,阻塞在这里print('%s 抢到A锁'% self.name) # 获取当前线程名mutexA.release()mutexB.release()if __name__ == '__main__':for i in range(10):t = MyThead()t.start()
递归锁 RLOCK
"""递归锁的特点可以被连续的acquire和release但是只能被第一个抢到这把锁执行上述操作它的内部有一个计数器 每acquire一次计数加一 每realse一次计数减一只要计数不为0 那么其他人都无法抢到该锁"""死锁解决方法:递归锁,在Python中为了支持在同一线程中多次请求同一资源,python提供了可重入锁RLock。这个RLock内部维护着一个Lock和一个counter变量,counter记录了acquire的次数,从而使得资源可以被多次require。直到一个线程所有的acquire都被release,其他的线程才能获得资源。上面的例子如果使用RLock代替Lock,则不会发生死锁:# 类只要加括号多次 产生的肯定是不同的对象# 如果你想要实现多次加括号等到的是相同的对象 单例模式# 将上述的mutexA = Lock()mutexB = Lock()# 换成mutexA = mutexB = RLock()
信号量 Semaphore
信号量在不同的阶段可能对应不同的技术点
在并发编程中信号量指的是锁!!!
"""如果我们将互斥锁比喻成一个厕所的话那么信号量就相当于多个厕所"""from threading import Thread, Semaphoreimport timeimport random"""利用random模块实现打印随机验证码(搜狗的一道笔试题)"""sm = Semaphore(5) # 括号内写数字 写几就表示开设几个坑位def task(name):sm.acquire() #最多可以获取到5把锁,其他的等5把释放后再获取锁,否则会阻塞在这里print('%s 正在蹲坑'% name)time.sleep(random.randint(1, 5))sm.release()if __name__ == '__main__':for i in range(20): #会运行thread20次t = Thread(target=task, args=('伞兵%s号'%i, ))t.start()
Event事件
一些进程/线程需要等待另外一些进程/线程运行完毕之后才能运行,类似于发射信号一样
event = Event() # 造了一个红绿灯
event.set() #运行到,wait开始执行
event.wait() # 等待别人给你发信号
from threading import Thread, Eventimport timeevent = Event() # 造了一个红绿灯def light():print('红灯亮着的')time.sleep(3)print('绿灯亮了')# 告诉等待红灯的人可以走了event.set()def car(name):print('%s 车正在灯红灯'%name)event.wait() # 等待别人给你发信号print('%s 车加油门飙车走了'%name)if __name__ == '__main__':t = Thread(target=light)t.start()for i in range(20):t = Thread(target=car, args=('%s'%i, ))t.start()
线程队列 queue
"""同一个进程下多个线程数据是共享的为什么先同一个进程下还会去使用队列呢因为队列是管道 + 锁所以用队列还是为了保证数据的安全"""import queue #进程使用的是Queue# 我们现在使用的队列都是只能在本地测试使用# 1 队列q 先进先出# q = queue.Queue(3)# q.put(1)# q.get()# q.get_nowait()# q.get(timeout=3)# q.full()# q.empty()# 后进先出q# q = queue.LifoQueue(3) # last in first out# q.put(1)# q.put(2)# q.put(3)# print(q.get()) # 3# 优先级q 你可以给放入队列中的数据设置进出的优先级q = queue.PriorityQueue(4)q.put((10, '111'))q.put((100, '222'))q.put((0, '333'))q.put((-5, '444'))print(q.get()) # (-5, '444')# put括号内放一个元祖 第一个放数字表示优先级# 需要注意的是 数字越小优先级越高!!!
进程池与线程池
先回顾之前TCP服务端实现并发的效果是怎么玩的
每来一个人就开设一个进程或者线程去处理
"""无论是开设进程也好还是开设线程也好 是不是都需要消耗资源只不过开设线程的消耗比开设进程的稍微小一点而已我们是不可能做到无限制的开设进程和线程的 因为计算机硬件的资源更不上!!!硬件的开发速度远远赶不上软件呐我们的宗旨应该是在保证计算机硬件能够正常工作的情况下最大限度的利用它"""# 池的概念"""什么是池?池是用来保证计算机硬件安全的情况下最大限度的利用计算机它降低了程序的运行效率但是保证了计算机硬件的安全 从而让你写的程序能够正常运行"""
基本使用
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutorimport timeimport os# pool = ThreadPoolExecutor(5) # 池子里面固定只有五个线程# 括号内可以传数字 不传的话默认会开设当前计算机cpu个数五倍的线程pool = ProcessPoolExecutor(5)# 括号内可以传数字 不传的话默认会开设当前计算机cpu个数进程"""池子造出来之后 里面会固定存在五个线程这个五个线程不会出现重复创建和销毁的过程池子造出来之后 里面会固定的几个进程这个几个进程不会出现重复创建和销毁的过程池子的使用非常的简单你只需要将需要做的任务往池子中提交即可 自动会有人来服务你"""def task(n):print(n,os.getpid())time.sleep(2)return n**ndef call_back(n):print('call_back>>>:',n.result())"""任务的提交方式同步:提交任务之后原地等待任务的返回结果 期间不做任何事异步:提交任务之后不等待任务的返回结果 执行继续往下执行返回结果如何获取???异步提交任务的返回结果 应该通过回调机制来获取回调机制就相当于给每个异步任务绑定了一个定时炸弹一旦该任务有结果立刻触发爆炸"""if __name__ == '__main__':# pool.submit(task, 1) # 朝池子中提交任务 异步提交# print('主')t_list = []for i in range(20): # 朝池子中提交20个任务# res = pool.submit(task, i) # <Future at 0x100f97b38 state=running>res = pool.submit(task, i).add_done_callback(call_back) #提交一个任务并绑定回调函数# print(res.result()) # result方法 同步提交# t_list.append(res)# 等待线程池中所有的任务执行完毕之后再继续往下执行# pool.shutdown() # 关闭线程池 等待线程池中所有的任务运行完毕# for t in t_list:# print('>>>:',t.result()) # 肯定是有序的"""程序有并发变成了串行任务的为什么打印的是Noneres.result() 拿到的就是异步提交的任务的返回结果"""
总结
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutorpool = ProcessPoolExecutor(5)pool.submit(task, i).add_done_callback(call_back)
协程
"""进程:资源单位线程:执行单位协程:这个概念完全是程序员自己意淫出来的 根本不存在单线程下实现并发我们程序员自己再代码层面上检测我们所有的IO操作一旦遇到IO了 我们在代码级别完成切换这样给CPU的感觉是你这个程序一直在运行 没有IO从而提升程序的运行效率多道技术切换+保存状态CPU两种切换1.程序遇到IO2.程序长时间占用TCP服务端acceptrecv代码如何做到切换+保存状态切换切换不一定是提升效率 也有可能是降低效率IO切 提升没有IO切 降低保存状态保存上一次我执行的状态 下一次来接着上一次的操作继续往后执行yield"""
验证协程切换是否就一定提升效率
协程切换并不串行快,如果有io再切换会更快
# import time## # 串行执行计算密集型的任务 用时1.2372429370880127# def func1():# for i in range(10000000):# i + 1## def func2():# for i in range(10000000):# i + 1## start_time = time.time()# func1()# func2()# print(time.time() - start_time)# 切换 + yield 用时:2.1247239112854004# import time### def func1():# while True:# 10000000 + 1# yield### def func2():# g = func1() # 先初始化出生成器# for i in range(10000000):# i + 1# next(g) #执行下一次func1## start_time = time.time()# func2()# print(time.time() - start_time)
gevent 检测io
安装
pip3 install geventfrom gevent import monkey;monkey.patch_all()import timefrom gevent import spawn"""gevent模块本身无法检测常见的一些io操作在使用的时候需要你额外的导入一句话from gevent import monkeymonkey.patch_all()又由于上面的两句话在使用gevent模块的时候是肯定要导入的所以还支持简写from gevent import monkey;monkey.patch_all()"""def heng():print('哼')time.sleep(2)print('哼')def ha():print('哈')time.sleep(3)print('哈')def heiheihei():print('heiheihei')time.sleep(5)print('heiheihei')start_time = time.time()g1 = spawn(heng) #创建一个普通的Greenlet对象并切换;遇到io就切换g2 = spawn(ha)g3 = spawn(heiheihei)g1.join()g2.join() # 等待被检测的任务执行完毕 再往后继续执行g3.join()# heng()# ha()# print(time.time() - start_time) # 5.005702018737793print(time.time() - start_time) # 3.004199981689453 5.005439043045044
协程实现TCP服务端的并发
# 服务端from gevent import monkey;monkey.patch_all()import socketfrom gevent import spawndef communication(conn):while True:try:data = conn.recv(1024)if len(data) == 0: breakconn.send(data.upper())except ConnectionResetError as e:print(e)breakconn.close()def server(ip, port):server = socket.socket()server.bind((ip, port))server.listen(5)while True:conn, addr = server.accept()spawn(communication, conn) #comunication是函数,conn是函数参数if __name__ == '__main__':g1 = spawn(server, '127.0.0.1', 8080)g1.join()# 客户端from threading import Thread, current_threadimport socketdef x_client():client = socket.socket()client.connect(('127.0.0.1',8080))n = 0while True:msg = '%s say hello %s'%(current_thread().name,n)n += 1client.send(msg.encode('utf-8'))data = client.recv(1024)print(data.decode('utf-8'))if __name__ == '__main__':for i in range(500):t = Thread(target=x_client)t.start()
总结
"""理想状态:我们可以通过多进程下面开设多线程多线程下面再开设协程序从而使我们的程序执行效率提升"""
IO模型
IO模型简介
"""我们这里研究的IO模型都是针对网络IO的Stevens在文章中一共比较了五种IO Model:* blocking IO 阻塞IO* nonblocking IO 非阻塞IO* IO multiplexing IO多路复用* signal driven IO 信号驱动IO* asynchronous IO 异步IO由signal driven IO(信号驱动IO)在实际中并不常用,所以主要介绍其余四种IO Model。"""#1)等待数据准备 (Waiting for the data to be ready)#2)将数据从内核拷贝到进程中(Copying the data from the kernel to the process)同步异步阻塞非阻塞常见的网络阻塞状态:acceptrecvrecvfromsend虽然它也有io行为 但是不在我们的考虑范围
阻塞IO模型
"""我们之前写的都是阻塞IO模型 协程除外"""import socketserver = socket.socket()server.bind(('127.0.0.1',8080))server.listen(5)while True:conn, addr = server.accept()while True:try:data = conn.recv(1024)if len(data) == 0:breakprint(data)conn.send(data.upper())except ConnectionResetError as e:breakconn.close()# 在服务端开设多进程或者多线程 进程池线程池 其实还是没有解决IO问题该等的地方还是得等 没有规避只不过多个人等待的彼此互不干扰
非阻塞IO setblocking
需要不断循环遍历conn连接对象
"""要自己实现一个非阻塞IO模型"""import socketimport timeserver = socket.socket()server.bind(('127.0.0.1', 8081))server.listen(5)server.setblocking(False)# 将所有的网络阻塞变为非阻塞r_list = []del_list = []while True:try:conn, addr = server.accept()r_list.append(conn)except BlockingIOError:# time.sleep(0.1)# print('列表的长度:',len(r_list))# print('做其他事')for conn in r_list:try:data = conn.recv(1024) # 没有消息 报错if len(data) == 0: # 客户端断开链接conn.close() # 关闭conn# 将无用的conn从r_list删除del_list.append(conn)continueconn.send(data.upper())except BlockingIOError:continueexcept ConnectionResetError:conn.close()del_list.append(conn)# 挥手无用的链接for conn in del_list:r_list.remove(conn)del_list.clear()# 客户端import socketclient = socket.socket()client.connect(('127.0.0.1',8081))while True:client.send(b'hello world')data = client.recv(1024)print(data)
总结
"""虽然非阻塞IO给你的感觉非常的牛逼但是该模型会 长时间占用着CPU并且不干活 让CPU不停的空转我们实际应用中也不会考虑使用非阻塞IO模型任何的技术点都有它存在的意义实际应用或者是思想借鉴"""
IO多路复用 select
"""当监管的对象只有一个的时候 其实IO多路复用连阻塞IO都比比不上!!!但是IO多路复用可以一次性监管很多个对象server = socket.socket()conn,addr = server.accept()监管机制是操作系统本身就有的 如果你想要用该监管机制(select)需要你导入对应的select模块"""import socketimport selectserver = socket.socket()server.bind(('127.0.0.1',8080))server.listen(5)server.setblocking(False)read_list = [server]while True:r_list, w_list, x_list = select.select(read_list, [], []) #read_list = [server]"""帮你监管一旦有人来了 立刻给你返回对应的监管对象"""# print(res) # ([<socket.socket fd=3, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 8080)>], [], [])# print(server)# print(r_list)for i in r_list: #"""针对不同的对象做不同的处理"""if i is server:conn, addr = i.accept()# 也应该添加到监管的队列中read_list.append(conn)else:res = i.recv(1024)if len(res) == 0:i.close()# 将无效的监管对象 移除read_list.remove(i)continueprint(res)i.send(b'heiheiheiheihei')# 客户端import socketclient = socket.socket()client.connect(('127.0.0.1',8080))while True:client.send(b'hello world')data = client.recv(1024)print(data)
监管机制:select 、poll、epoll
"""监管机制其实有很多select机制 windows linux都有poll机制 只在linux有 poll和select都可以监管多个对象 但是poll监管的数量更多上述select和poll机制其实都不是很完美 当监管的对象特别多的时候可能会出现 极其大的延时响应epoll机制 只在linux有它给每一个监管对象都绑定一个回调机制一旦有响应 回调机制立刻发起提醒针对不同的操作系统还需要考虑不同检测机制 书写代码太多繁琐有一个人能够根据你跑的平台的不同自动帮你选择对应的监管机制selectors模块"""
异步IO asyncio
"""异步IO模型是所有模型中效率最高的 也是使用最广泛的相关的模块和框架模块:asyncio模块异步框架:sanic tronado twisted速度快!!!"""import threadingimport asyncio@asyncio.coroutinedef hello():print('hello world %s'%threading.current_thread())yield from asyncio.sleep(1) # 换成真正的IO操作print('hello world %s' % threading.current_thread())loop = asyncio.get_event_loop()tasks = [hello(),hello()]loop.run_until_complete(asyncio.wait(tasks))loop.close()
四个IO模型对比
参考博客园图解,稍微了解即可
网络并发知识点梳理
软件开发架构
互联网协议
"""osi七层五层每一层都是干嘛的以太网协议 广播风暴IP协议TCP/UDP"""
三次握手四次挥手
socket简介
TCP黏包问题 定制固定长度的报头
UDP协议
socketserver模块
操作系统发展史
多道技术
进程理论
开启进程的两种方式
互斥锁
生产者消费者模型
线程理论
开启线程的两种方式
GIL全局解释器锁
进程池线程池
协程的概念
IO模型的了解
