网络知识

CS架构与BS架构

  1. Client<===========>Server
  2. 客户端软件send 服务端软件recv
  3. 操作系统 操作系统
  4. 计算机硬件<====物理介质=====>计算机硬件
  5. Browser<===========>Server
  1. # 2、网络通信
  2. 网络存在的意义就是跨地域数据传输=》称之为通信
  3. 网络=物理链接介质+互联网通信协议
  4. # 3、OSI七层协议
  5. 五层协议
  6. 应用层
  7. 传输层
  8. 网络层
  9. 数据链路层
  10. 物理层
  11. 协议:规定数据的组织格式
  12. 格式:头部+数据部分
  13. 封包裹的过程:数据外加头
  14. 拆包裹的过程:拆掉头获取数据

五层协议

  1. #4、五层协议
  2. 计算机1 计算机2
  3. 应用层 应用层
  4. 传输层 传输层
  5. 网络层 网络层
  6. 数据链路层 数据链路层
  7. 物理层 <===========交互机===========> 物理层
  8. 0101010101010
  9. (源mac地址,目标mac地址)(源ip地址,目标ip地址)数据
  10. #4.1 物理层负责发送电信号
  11. 一组物理层数据称之为:位
  12. 单纯的电信号毫无意义,必须对其进行分组
  13. #4.2 数据链路层:ethernet以太网协议
  14. 规定1:一组数据称之为一个数据帧
  15. 规定2:数据帧分成两部分=》头+数据
  16. 头包含:源地址与目标地址,该地址是mac地址
  17. 数据包含:包含的是网络层发过来的整体的内容
  18. 规定3:规定但凡接入互联网的主机必须有一块网卡,每块网卡在出厂时都烧制好一个全世界独一无二的地址,该地址称之为mac地址
  19. 注意:计算机通信基本靠吼,即以太网协议的工作方式是广播
  20. egon,血嫌弃)(帮我买包子)

网络层

  1. #4.3 网络层:IP协议
  2. 要达到的目的:
  3. 划分广播域
  4. 每一个广播域但凡要接通外部,一定要有一个网关帮内部的计算机转发包到公网
  5. 网关与外界通信走的是路由协议
  6. 规定1:一组数据称之为一个数据包
  7. 规定2:数据帧分成两部分=》头+数据
  8. 头包含:源地址与目标地址,该地址是IP地址
  9. 数据包含的:传输层发过来的整体的内容
  10. ipv4地址:
  11. 8bit.8bit.8bit.8bit
  12. 0.0.0.0
  13. 255.255.255.255
  14. 子网掩码:
  15. 8bit.8bit.8bit.8bit
  16. 255.255.255.0对应的二进制表达
  17. 11111111.11111111.11111111.00000000
  18. 一个合法的ipv4地址组成部分=ip地址/子网掩码地址
  19. 172.16.10.1/255.255.255.0
  20. 172.16.10.1/24
  21. 计算机1
  22. 172.16.10.1 10101100.00010000.00001010.000000001
  23. 255255.255.255.0: 11111111.11111111.11111111.000000000
  24. 172.16.10.0: 10101100.00010000.00001010.000000000
  25. 计算机2
  26. 172.16.10.2 10101100.00010000.00001010.000000010
  27. 255.255.255.255.0: 11111111.11111111.11111111.000000000
  28. 172.16.10.0: 10101100.00010000.00001010.000000000
  29. 计算机1 计算机2
  30. 应用层 应用层
  31. 传输层 传输层
  32. 网络层 网络层
  33. 数据链路层 数据链路层
  34. 物理层 <=========二层交互机========> 物理层
  35. 0101010101010
  36. (源mac地址,xxxx)(源ip地址,目标ip地址)数据
  37. (源mac地址,网关的mac地址)(172.16.10.10/24101.100.200.11/10)数据
  38. 事先知道的是对方的ip地址
  39. 但是计算机的底层通信是基于ethernet以太网协议的mac地址通信
  40. ARP
  41. 所以必须能够将ip地址解析成mac地址
  42. # 两台计算机在同一个局域网内
  43. 计算机1172.16.10.10/24 直接 计算机2172.16.10.11/24
  44. ARP
  45. 自己的ip,对方的ip
  46. 1、计算二者网络地址,如果一样,拿到计算机2mac地址就可以了
  47. 2、发送广播包
  48. 发送端mac FF:FF:FF:FF:FF:FF 172.16.10.10/24 172.16.10.11/24 数据
  49. # 两台计算机不在同一个局域网内
  50. 计算机1172.16.10.10/24 网关 计算机2101.100.200.11/10
  51. ARP
  52. 自己的ip,对方的ip
  53. 1、计算机二者网络地址,如果不一样,应该拿到网关的mac地址
  54. 2、发送广播包
  55. 发送端mac FF:FF:FF:FF:FF:FF 172.16.10.10/24 172.16.10.1/24 数据
  56. #4.3.1 总结******
  57. ip地址+mac地址=》标识全世界范围内独一无二的一台计算机
  58. 或者:
  59. ip地址=》标识全世界范围内独一无二的一台计算机

传输层

  1. 五层协议
  2. 计算机1 计算机2
  3. 应用层 应用层
  4. socket socket
  5. 传输层 传输层
  6. 网络层 网络层
  7. 数据链路层 数据链路层
  8. 物理层 <===========交互机===========> 物理层
  9. 客户端软件send 服务端软件recv
  10. 操作系统 操作系统
  11. 计算机硬件<====物理介质=====>计算机硬件
  12. ethernet头+ip头+tcp头+应用层的头+应用层数据
  13. #一:传输层 tcp\udp=》基于端口
  14. 端口范围0-655350-1023为系统占用端口
  15. ip+port=》标识全世界范围内独一无二的一个基于网络通信的应用程序
  16. 基于tcp协议通信之前:必须建立一个双向通信的链接
  17. C-------------------->S
  18. C<--------------------S
  19. 三次握手建立链接:
  20. 建立链接是为了传数据做准备的,三次握手即可
  21. 四次挥手断开链接
  22. 断开链接时,由于链接内有数据传输,所以必须分四次断开
  23. tcp是可靠传输的
  24. 发送数据必须等到对方确认后才算完成,才会将自己内存中的数据清理掉,否则重传
  25. ps:当服务端大量处于TIME_WAIT状态时意味着服务端正在经历高并发
  26. tcp协议的半连接池:
  27. backlog
  28. [链接请求1,链接请求2,链接请求3,链接请求5]
  29. #二:应用层:
  30. 可以自定义协议=》头部+数据部分
  31. 自定义协议需要注意的问题:
  32. 1、两大组成部分=头部+数据部分
  33. 头部:放对数据的描述信息
  34. 比如:数据要发给谁,数据的类型,数据的长度
  35. 数据部分:想要发的数据
  36. 2、头部的长度必须固定
  37. 因为接收端要通过头部获取所接接收数据的详细信息
  38. http https ftp
  39. www.163.com.
  40. 三:socket介绍
  41. import socket

网络编程socket

基于tcp协议的套接字通信

服务端

  1. import socket
  2. # 1、买手机
  3. phone=socket.socket(socket.AF_INET,socket.SOCK_STREAM) # socket.AF_INET用于跨网络通信,socket.SOCK_STREAM流式协议=》tcp协议
  4. # 2、绑定手机卡
  5. phone.bind(('127.0.0.1',8081)) # 0-65535, 1024以前的都被系统保留使用
  6. # 3、开机
  7. phone.listen(5) # 5指的是半连接池的大小
  8. print('服务端启动完成,监听地址为:%s:%s' %('127.0.0.1',8080))
  9. # 4、等待电话连接请求:拿到电话连接conn
  10. conn,client_addr=phone.accept() #获取连接对象及连接的ip地址
  11. # print(conn)
  12. print("客户端的ip和端口:",client_addr)
  13. # 5、通信:收\发消息
  14. data=conn.recv(1024) # 最大接收的数据量为1024Bytes,收到的是bytes类型
  15. print("客户端发来的消息:",data.decode('utf-8'))
  16. conn.send(data.upper())
  17. # 6、关闭电话连接conn(必选的回收资源的操作)
  18. conn.close()
  19. # 7、关机(可选操作)
  20. phone.close()

客户端

  1. import socket
  2. #1、买手机
  3. phone=socket.socket(socket.AF_INET,socket.SOCK_STREAM) # socket.AF_INET用于跨网络通信,socket.SOCK_STREAM流式协议=》tcp协议
  4. #2、拨通服务端电话
  5. phone.connect(('127.0.0.1',8081))
  6. #3、通信
  7. import time
  8. time.sleep(10)
  9. phone.send('hello egon 哈哈哈'.encode('utf-8'))
  10. data=phone.recv(1024)
  11. print(data.decode('utf-8'))
  12. #4、关闭连接(必选的回收资源的操作)
  13. phone.close()

服务端加上通信循环

服务端

  1. import socket
  2. # 1、买手机
  3. phone=socket.socket(socket.AF_INET,socket.SOCK_STREAM) # 流式协议=》tcp协议
  4. # 2、绑定手机卡
  5. phone.bind(('127.0.0.1',8083)) # 0-65535, 1024以前的都被系统保留使用
  6. # 3、开机
  7. phone.listen(5) # 5指的是半连接池的大小
  8. print('服务端启动完成,监听地址为:%s:%s' %('127.0.0.1',8080))
  9. # 4、等待电话连接请求:拿到电话连接conn
  10. conn,client_addr=phone.accept()
  11. # 5、通信循环:收\发消息,只有一层while true
  12. while True:
  13. try:
  14. data=conn.recv(1024) # 最大接收的数据量为1024Bytes,收到的是bytes类型
  15. if len(data) == 0:
  16. # 在unix系统洗,一旦data收到的是空
  17. # 意味着是一种异常的行为:客户度非法断开了链接
  18. break
  19. print("客户端发来的消息:",data.decode('utf-8'))
  20. conn.send(data.upper())
  21. except Exception:
  22. # 针对windows系统
  23. break
  24. # 6、关闭电话连接conn(必选的回收资源的操作)
  25. conn.close()
  26. # 7、关机(可选操作)
  27. phone.close()

客户端

  1. import socket
  2. #1、买手机
  3. phone=socket.socket(socket.AF_INET,socket.SOCK_STREAM) # 流式协议=》tcp协议
  4. #2、拨通服务端电话
  5. phone.connect(('127.0.0.1',8083))
  6. #3、通信
  7. while True:
  8. msg=input("输入要发送的消息>>>: ").strip() #msg=''
  9. if len(msg) == 0:continue
  10. phone.send(msg.encode('utf-8'))
  11. print('======?')
  12. data=phone.recv(1024)
  13. print(data.decode('utf-8'))
  14. #4、关闭连接(必选的回收资源的操作)
  15. phone.close()

服务端加上链接循环

  1. # 服务端应该满足的特点:
  2. # 1、一直提供服务
  3. # 2、并发地提供服务
  4. import socket
  5. # 1、买手机
  6. phone=socket.socket(socket.AF_INET,socket.SOCK_STREAM) # 流式协议=》tcp协议
  7. # 2、绑定手机卡
  8. phone.bind(('127.0.0.1',8080)) # 0-65535, 1024以前的都被系统保留使用
  9. # 3、开机
  10. phone.listen(5) # 5指的是半连接池的大小
  11. print('服务端启动完成,监听地址为:%s:%s' %('127.0.0.1',8080))
  12. # 4、等待电话连接请求:拿到电话连接conn
  13. # 加上链接循环
  14. while True:
  15. conn,client_addr=phone.accept() #链接循环,整个处理消息的过程是单线程的。
  16. # 5、通信:收\发消息
  17. while True:
  18. try:
  19. data=conn.recv(1024) # 最大接收的数据量为1024Bytes,收到的是bytes类型
  20. if len(data) == 0:
  21. # 在unix系统洗,一旦data收到的是空
  22. # 意味着是一种异常的行为:客户度非法断开了链接
  23. break
  24. print("客户端发来的消息:",data.decode('utf-8'))
  25. conn.send(data.upper())
  26. except Exception:
  27. # 针对windows系统
  28. break
  29. # 6、关闭电话连接conn(必选的回收资源的操作)
  30. conn.close()
  31. # 7、关机(可选操作)
  32. phone.close()

基于udp协议的套接字通信

服务端

  1. import socket
  2. server=socket.socket(socket.AF_INET,socket.SOCK_DGRAM) # 数据报协议=》udp协议
  3. server.bind(('127.0.0.1',8081))
  4. while True:
  5. data,client_addr=server.recvfrom(1024)
  6. server.sendto(data.upper(),client_addr)
  7. server.close()

客户端

  1. import socket
  2. client=socket.socket(socket.AF_INET,socket.SOCK_DGRAM) # 流式协议=》tcp协议
  3. while True:
  4. msg=input('>>>: ').strip()
  5. client.sendto(msg.encode('utf-8'),('127.0.0.1',8081))
  6. res=client.recvfrom(1024)
  7. print(res)
  8. client.close()

基于tcp协议远程执行命令

服务端

  1. # 服务端应该满足两个特点:
  2. # 1、一直对外提供服务
  3. # 2、并发地服务多个客户端
  4. import subprocess
  5. from socket import *
  6. server=socket(AF_INET,SOCK_STREAM)
  7. server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) #就是它,在bind前加,针对windows系统实现端口重用
  8. server.bind(('127.0.0.1',8082))
  9. server.listen(5)
  10. # 服务端应该做两件事
  11. # 第一件事:循环地从板连接池中取出链接请求与其建立双向链接,拿到链接对象
  12. while True:
  13. conn,client_addr=server.accept()
  14. # 第二件事:拿到链接对象,与其进行通信循环
  15. while True:
  16. try:
  17. cmd=conn.recv(1024) #当发送的字节数超过1024会产生粘包问题
  18. if len(cmd) == 0:break #客户端发送cmd命令给服务端执行
  19. obj=subprocess.Popen(cmd.decode('utf-8'), #开启子进程执行命令
  20. shell=True,
  21. stdout=subprocess.PIPE,
  22. stderr=subprocess.PIPE
  23. )
  24. stdout_res=obj.stdout.read()
  25. stderr_res=obj.stderr.read()
  26. print(len(stdout_res)+len(stderr_res))
  27. # conn.send(stdout_res+stderr_res) # ???
  28. conn.send(stdout_res)
  29. conn.send(stderr_res)
  30. # with open("1.mp4",mode='rb') as f:
  31. # for line in f:
  32. # conn.send(line)
  33. except Exception:
  34. break
  35. conn.close()

客户端

  1. from socket import *
  2. client=socket(AF_INET,SOCK_STREAM)
  3. client.connect(('127.0.0.1',8082))
  4. while True:
  5. cmd=input('请输入命令>>:').strip()
  6. if len(cmd) == 0:continue
  7. client.send(cmd.encode('utf-8'))
  8. # 解决粘包问题思路:
  9. # 1、拿到数据的总大小total_size
  10. # 2、recv_size=0,循环接收,每接收一次,recv_size+=接收的长度
  11. # 3、直到recv_size=total_size,结束循环
  12. cmd_res=client.recv(1024) # 本次接收,最大接收1024Bytes
  13. print(cmd_res.decode('utf-8')) # 强调:windows系统用gbk
  14. # 粘包问题出现的原因
  15. # 1、tcp是流式协议,数据像水流一样粘在一起,没有任何边界区分
  16. # 2、收数据没收干净,有残留,就会下一次结果混淆在一起
  17. # 解决的核心法门就是:每次都收干净,不要任何残留

udp协议没有粘包问题

服务端

  1. import socket
  2. server=socket.socket(socket.AF_INET,socket.SOCK_DGRAM)
  3. server.bind(('127.0.0.1',8080))
  4. res1=server.recvfrom(2) # b"hello"
  5. print(res1)
  6. res2=server.recvfrom(3) # b"world"
  7. print(res2)
  8. server.close()

客户端

  1. import socket
  2. client=socket.socket(socket.AF_INET,socket.SOCK_DGRAM)
  3. client.sendto(b'hello',('127.0.0.1',8080))
  4. client.sendto(b'world',('127.0.0.1',8080))
  5. client.close()

解决粘包问题

服务端

  1. # 服务端应该满足两个特点:
  2. # 1、一直对外提供服务
  3. # 2、并发地服务多个客户端
  4. import subprocess
  5. import struct
  6. from socket import *
  7. server=socket(AF_INET,SOCK_STREAM)
  8. server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) #就是它,在bind前加
  9. server.bind(('127.0.0.1',8083))
  10. server.listen(5)
  11. # 服务端应该做两件事
  12. # 第一件事:循环地从板连接池中取出链接请求与其建立双向链接,拿到链接对象
  13. while True:
  14. conn,client_addr=server.accept()
  15. # 第二件事:拿到链接对象,与其进行通信循环
  16. while True:
  17. try:
  18. cmd=conn.recv(1024)
  19. if len(cmd) == 0:break
  20. obj=subprocess.Popen(cmd.decode('utf-8'),
  21. shell=True,
  22. stdout=subprocess.PIPE,
  23. stderr=subprocess.PIPE
  24. )
  25. stdout_res=obj.stdout.read()
  26. stderr_res=obj.stderr.read()
  27. total_size=len(stdout_res)+len(stderr_res)
  28. # 1、先发头信息(固定长度的bytes):对数据描述信息
  29. # int->固定长度的bytes
  30. header=struct.pack('i',total_size) #将数据打包成整数类型二进制
  31. conn.send(header) #发送总数据长度的二进制格式
  32. # 2、再发真实的数据
  33. conn.send(stdout_res) #将执行结果发送给客户端
  34. conn.send(stderr_res) #将执行错误信息发送给客户端
  35. except Exception:
  36. break
  37. conn.close()

客户端

  1. import struct
  2. from socket import *
  3. client=socket(AF_INET,SOCK_STREAM)
  4. client.connect(('127.0.0.1',8083))
  5. while True:
  6. cmd=input('请输入命令>>:').strip()
  7. if len(cmd) == 0:continue
  8. client.send(cmd.encode('utf-8'))
  9. # 解决粘包问题思路:
  10. # 一、先收固定长度的头:解析出数据的描述信息,包括数据的总大小total_size
  11. header=client.recv(4)
  12. total_size=struct.unpack('i',header)[0] #i代表interger整数类型
  13. # 二、根据解析出的描述信息,接收真实的数据
  14. # 2、recv_size=0,循环接收,每接收一次,recv_size+=接收的长度
  15. # 3、直到recv_size=total_size,结束循环
  16. recv_size = 0
  17. while recv_size < total_size:
  18. recv_data=client.recv(1024)
  19. recv_size+=len(recv_data)
  20. print(recv_data.decode('utf-8'),end='')
  21. else:
  22. print()
  23. # 粘包问题出现的原因
  24. # 1、tcp是流式协议,数据像水流一样粘在一起,没有任何边界区分
  25. # 2、收数据没收干净,有残留,就会下一次结果混淆在一起
  26. # 解决的核心法门就是:每次都收干净,不要任何残留

解决粘包问题(终极版)

struct.pack

res=struct.pack(‘i’,999999999) #对999999999以内数字封包都是4个字节长度。

  1. import json
  2. import struct
  3. header_dic={
  4. "filename":"a.txt",
  5. "total_size":12312312122222222222222222222222223123123123123121222222222222222222222222231231231231231212222222222222222222222222312312312312312122222222222222222222222223123123123123121222222222222222222222222231231231231231212222222222222222222222222312312312312312122222222222222222222222223123123123123121222222222222222222222222231231231231231212222222222222222222222222312312312312312122222222222222222222222223123123123123121222222222222222222222222231231231231231212222222222222222222222222312312312312312122222222222222222222222223123123123123121222222222222222222222222231231231231231212222222222222222222222222312312312311231231212222222222222222222222222312312312312312122222222222222222222222223123123123123121222222222222222222222222231231231231231212222222222222222222222222312312312312312122222222222222222222222223123123123123121222222222222222222222222231231231231231212222222222222222222222222312312312312312122222222222222222222222223123123123123121222222222222222222222222231231231231231212222222222222222222222222312312312312312122222222222222222222222223123123123123121222222222222222222222222231231231231231212222222222222222222222222312312312312312122222222222222222222222223123123123123121222222222222222222222222231231231231231212222222222222222222222222312312312312312122222222222222222222222223123123123123121222222222222222222222222231231231231231212222222222222222222222222312312312312312122222222222222222222222223123123123123121222222222222222222222222231231232312122222222222222222222222223123123,
  6. "md5":"123123xi12ix12",
  7. "asf":"123123xi12ix12",
  8. "asf1":"123123xi12ix12",
  9. "asf2":"123123xi12ix12",
  10. "asf3":"123123xi12ix12",
  11. }
  12. json_str=json.dumps(header_dic)
  13. json_str_bytes=json_str.encode('utf-8')
  14. print(len(json_str_bytes))
  15. res=struct.pack('i',999999999) #对999999999以内数字封包都是4个字节长度。
  16. print(res,len(res))
  17. res=struct.pack('i',len(json_str_bytes))

服务端

  1. # 服务端应该满足两个特点:
  2. # 1、一直对外提供服务
  3. # 2、并发地服务多个客户端
  4. import subprocess
  5. import struct
  6. import json
  7. from socket import *
  8. server=socket(AF_INET,SOCK_STREAM)
  9. server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) #就是它,在bind前加
  10. server.bind(('127.0.0.1',8083))
  11. server.listen(5)
  12. # 服务端应该做两件事
  13. # 第一件事:循环地从板连接池中取出链接请求与其建立双向链接,拿到链接对象
  14. while True:
  15. conn,client_addr=server.accept()
  16. # 第二件事:拿到链接对象,与其进行通信循环
  17. while True:
  18. try:
  19. cmd=conn.recv(1024)
  20. if len(cmd) == 0:break
  21. obj=subprocess.Popen(cmd.decode('utf-8'),
  22. shell=True,
  23. stdout=subprocess.PIPE,
  24. stderr=subprocess.PIPE
  25. )
  26. stdout_res=obj.stdout.read()
  27. stderr_res=obj.stderr.read()
  28. total_size=len(stdout_res)+len(stderr_res)
  29. # 1、制作头
  30. header_dic={
  31. "filename":"a.txt",
  32. "total_size":total_size,
  33. "md5":"123123xi12ix12"
  34. }
  35. json_str = json.dumps(header_dic)
  36. json_str_bytes = json_str.encode('utf-8')
  37. # 2、先把头的长度发过去
  38. x=struct.pack('i',len(json_str_bytes))
  39. conn.send(x)
  40. # 3、发头信息
  41. conn.send(json_str_bytes)
  42. # 4、再发真实的数据
  43. conn.send(stdout_res)
  44. conn.send(stderr_res)
  45. except Exception:
  46. break
  47. conn.close()

客户端

  1. import struct
  2. import json
  3. from socket import *
  4. client=socket(AF_INET,SOCK_STREAM)
  5. client.connect(('127.0.0.1',8083))
  6. while True:
  7. cmd=input('请输入命令>>:').strip()
  8. if len(cmd) == 0:continue
  9. client.send(cmd.encode('utf-8'))
  10. # 接收端
  11. # 1、先手4个字节,从中提取接下来要收的头的长度
  12. x=client.recv(4)
  13. header_len=struct.unpack('i',x)[0]
  14. # 2、接收头,并解析
  15. json_str_bytes=client.recv(header_len)
  16. json_str=json_str_bytes.decode('utf-8')
  17. header_dic=json.loads(json_str)
  18. print(header_dic)
  19. total_size=header_dic["total_size"]
  20. # 3、接收真实的数据
  21. recv_size = 0
  22. while recv_size < total_size:
  23. recv_data=client.recv(1024)
  24. recv_size+=len(recv_data)
  25. print(recv_data.decode('utf-8'),end='')
  26. else:
  27. print()
  28. # 粘包问题出现的原因
  29. # 1、tcp是流式协议,数据像水流一样粘在一起,没有任何边界区分
  30. # 2、收数据没收干净,有残留,就会下一次结果混淆在一起
  31. # 解决的核心法门就是:每次都收干净,不要任何残留

socketserver模块

基于tcp协议的使用

服务端、支持并发

  1. import socketserver
  2. class MyRequestHandle(socketserver.BaseRequestHandler):
  3. def handle(self):
  4. # 如果tcp协议,self.request=>conn
  5. print(self.client_address)
  6. while True:
  7. try:
  8. msg = self.request.recv(1024)
  9. if len(msg) == 0: break
  10. self.request.send(msg.upper())
  11. except Exception:
  12. break
  13. self.request.close()
  14. # 服务端应该做两件事
  15. # 第一件事:循环地从半连接池中取出链接请求与其建立双向链接,拿到链接对象
  16. # 实例化得到一个tcp连接的对象,Threading意思是,只要来了请求,它自动开一个线程来处理并进行交互
  17. # 第一个参数是绑定的地址,第二个参数的传一个类
  18. s=socketserver.ThreadingTCPServer(('127.0.0.1',8889),MyRequestHandle)
  19. s.serve_forever()
  20. # 等同于
  21. # while True:
  22. # conn,client_addr=server.accept()
  23. # 启动一个线程(conn,client_addr)
  24. # 第二件事:拿到链接对象,与其进行通信循环===>handle

客户端

  1. from socket import *
  2. client=socket(AF_INET,SOCK_STREAM)
  3. client.connect(('127.0.0.1',8889))
  4. while True:
  5. msg=input('请输入命令>>:').strip()
  6. if len(msg) == 0:continue
  7. client.send(msg.encode('utf-8'))
  8. res=client.recv(1024)
  9. print(res.decode('utf-8'))

基于udp协议

服务端

  1. import socketserver
  2. class MyRequestHanlde(socketserver.BaseRequestHandler):
  3. def handle(self):
  4. client_data=self.request[0]
  5. server=self.request[1]
  6. client_address=self.client_address
  7. print('客户端发来的数据%s' %client_data)
  8. server.sendto(client_data.upper(),client_address)
  9. s=socketserver.ThreadingUDPServer(("127.0.0.1",8888),MyRequestHanlde)
  10. s.serve_forever()
  11. # 相当于:只负责循环地收
  12. # while True:
  13. # data,client_addr=server.recvfrom(1024)
  14. # 启动一个线程处理后续的事情(data,client_addr)

客户端

  1. import socket
  2. client=socket.socket(socket.AF_INET,socket.SOCK_DGRAM) # 流式协议=》tcp协议
  3. while True:
  4. msg=input('>>>: ').strip()
  5. client.sendto(msg.encode('utf-8'),('115.29.65.16',8888))
  6. res=client.recvfrom(1024)
  7. print(res)
  8. client.close()

进程

同步异步、阻塞与非阻塞

  1. 切换(CPU)分为两种情况
  2. 1.当一个程序遇到IO操作的时候,操作系统会剥夺该程序的CPU执行权限
  3. 作用:提高了CPU的利用率 并且也不影响程序的执行效率
  4. 2.当一个程序长时间占用CPU的时候,操作吸引也会剥夺该程序的CPU执行权限
  5. 弊端:降低了程序的执行效率(原本时间+切换时间)
  6. 同步和异步
  7. """描述的是任务的提交方式"""
  8. 同步:任务提交之后,原地等待任务的返回结果,等待的过程中不做任何事(干等)
  9. 程序层面上表现出来的感觉就是卡住了
  10. 异步:任务提交之后,不原地等待任务的返回结果,直接去做其他事情
  11. 我提交的任务结果如何获取?
  12. 任务的返回结果会有一个异步回调机制自动处理
  13. 阻塞非阻塞
  14. """描述的程序的运行状态"""
  15. 阻塞:阻塞态
  16. 非阻塞:就绪态、运行态
  17. 理想状态:我们应该让我们的写的代码永远处于就绪态和运行态之间切换

同步代码

  1. import time
  2. def func():
  3. time.sleep(3)
  4. print('hello world')
  5. if __name__ == '__main__':
  6. res = func() # 同步调用
  7. print('hahaha')

开启进程的二种方式

第一种直接使用multiprocessing的process创建对象

第二种使用类继承process类,实现run方法

  1. # 第一种(直接使用)
  2. from multiprocessing import Process
  3. import time
  4. def task(name):
  5. print('%s is running'%name)
  6. time.sleep(3)
  7. print('%s is over'%name)
  8. if __name__ == '__main__':
  9. # 1 创建一个对象
  10. p = Process(target=task, args=('jason',)) #传入需要执行的函数,及函数参数
  11. # 容器类型哪怕里面只有1个元素 建议要用逗号隔开
  12. # 2 开启进程
  13. p.start() # 告诉操作系统帮你创建一个进程 异步
  14. print('主')
  15. """
  16. windows操作系统下 创建进程一定要在main内创建
  17. 因为windows下创建进程类似于模块导入的方式
  18. 会从上往下依次执行代码
  19. linux中则是直接将代码完整的拷贝一份
  20. """
  21. # 第二种方式 类的继承
  22. from multiprocessing import Process
  23. import time
  24. class MyProcess(Process): #继承process方法
  25. def run(self): #需要实现run方法
  26. print('hello bf girl')
  27. time.sleep(1)
  28. print('get out!')
  29. if __name__ == '__main__':
  30. p = MyProcess()
  31. p.start()
  32. print('主')

join方法

start方法:开启p进程

join方法:阻塞创建的p进程运行,主进程等待子进程p运行结束后再继续往后执行。

  1. from multiprocessing import Process
  2. import time
  3. def task(name, n):
  4. print('%s is running'%name)
  5. time.sleep(n)
  6. print('%s is over'%name)
  7. if __name__ == '__main__':
  8. # p1 = Process(target=task, args=('jason', 1))
  9. # p2 = Process(target=task, args=('egon', 2))
  10. # p3 = Process(target=task, args=('tank', 3))
  11. # start_time = time.time()
  12. # p1.start()
  13. # p2.start()
  14. # p3.start() # 仅仅是告诉操作系统要创建进程
  15. # # time.sleep(50000000000000000000)
  16. # # p.join() # 主进程等待子进程p运行结束之后再继续往后执行
  17. # p1.join()
  18. # p2.join()
  19. # p3.join()
  20. start_time = time.time()
  21. p_list = []
  22. for i in range(1, 4):
  23. p = Process(target=task, args=('子进程%s'%i, i))
  24. p.start()
  25. p_list.append(p)
  26. for p in p_list:
  27. p.join()
  28. print('主', time.time() - start_time)

进程间数据隔离

进程间数据并不共享

  1. 创建进程就是在内存中申请一块内存空间将需要运行的代码丢进去
  2. 一个进程对应在内存中就是一块独立的内存空间
  3. 多个进程对应在内存中就是多块独立的内存空间
  4. 进程与进程之间数据默认情况下是无法直接交互,如果想交互可以借助于第三方工具、模块
  1. from multiprocessing import Process
  2. money = 100
  3. def task():
  4. global money # 局部修改全局
  5. money = 666
  6. print('子',money)
  7. if __name__ == '__main__':
  8. p = Process(target=task)
  9. p.start()
  10. p.join()
  11. print(money)
  12. 打印结果:
  13. 666
  14. 100

进程对象及其他方法

os.getppid() 查看当前父进程的进程号

os.getpid() 查看当前进程号

current_process().pid # 查看当前进程的进程号

p.terminate() # 杀死当前进程

  1. 是告诉操作系统帮你去杀死当前进程 但是需要一定的时间 而代码的运行速度极快

p.is_alive() # 判断当前进程是否存活

  1. from multiprocessing import Process, current_process
  2. import time
  3. import os
  4. def task():
  5. # print('%s is running'%current_process().pid) # 查看当前进程的进程号
  6. print('%s is running'%os.getpid()) # 查看当前进程的进程号
  7. # print('子进程的主进程号%s'%os.getppid()) # 查看当前进程的进程号
  8. time.sleep(30)
  9. if __name__ == '__main__':
  10. p = Process(target=task)
  11. p.start()
  12. p.terminate() # 杀死当前进程
  13. # 是告诉操作系统帮你去杀死当前进程 但是需要一定的时间 而代码的运行速度极快
  14. time.sleep(0.1)
  15. print(p.is_alive()) # 判断当前进程是否存活
  16. """
  17. 一般情况下我们会默认将
  18. 存储布尔值的变量名
  19. 和返回的结果是布尔值的方法名
  20. 都起成以is_开头
  21. """
  22. print('主')
  23. # print('主',current_process().pid)
  24. # print('主',os.getpid())
  25. # print('主主',os.getppid()) # 获取父进程的pid号

僵尸与孤儿进程

  1. # 僵尸进程
  2. """
  3. 死了但是没有死透
  4. 当你开设了子进程之后 该进程死后不会立刻释放占用的进程号
  5. 因为我要让父进程能够查看到它开设的子进程的一些基本信息 占用的pid号 运行时间。。。
  6. 所有的进程都会步入僵尸进程
  7. 父进程不死并且在无限制的创建子进程并且子进程也不结束
  8. 回收子进程占用的pid号
  9. 父进程等待子进程运行结束
  10. 父进程调用join方法
  11. """
  12. # 孤儿进程
  13. """
  14. 子进程存活,父进程意外死亡
  15. 操作系统会开设一个“儿童福利院”专门管理孤儿进程回收相关资源
  16. """
  1. from multiprocessing import Process
  2. import time
  3. def run():
  4. print('hello world')
  5. time.sleep(3)
  6. print('get out')
  7. if __name__ == '__main__':
  8. p = Process(target=run)
  9. p.start()
  10. print('主')

守护进程

1.1、什么是守护进程?

  1. **1、*_守护进程会在主进程代码运行结束的情况下,立即挂掉。*_**
  2. **2、守护进程本身就是一个子进程。**
  3. **3、主进程在其代码结束后就已经算运行完毕了(守护进程在此时就被回收),然后主进程会一直等非守护的子进程都运行完毕后回收子进程的资源(否则会产生僵尸进程),才会结束,**

p.daemon = True # 将进程p设置成守护进程 这一句一定要放在start方法上面才有效否则会直接报错
p.start()

  1. from multiprocessing import Process
  2. import time
  3. def task(name):
  4. print('%s总管正在活着'% name)
  5. time.sleep(3)
  6. print('%s总管正在死亡' % name)
  7. if __name__ == '__main__':
  8. p = Process(target=task,args=('egon',))
  9. # p = Process(target=task,kwargs={'name':'egon'})
  10. p.daemon = True # 将进程p设置成守护进程 这一句一定要放在start方法上面才有效否则会直接报错
  11. p.start()
  12. print('皇帝jason寿终正寝')

互斥锁

from multiprocessing import Process, Lock

多个进程操作同一份数据的时候,会出现数据错乱的问题

针对上述问题,解决方式就是加锁处理:将并发变成串行,牺牲效率但是保证了数据的安全

mutex = Lock()

mutex.acquire() #进程抢锁

干活,操作同一个数据

mutex.release() #进程释放锁

  1. 注意:
  2. 1.锁不要轻易的使用,容易造成死锁现象(我们写代码一般不会用到,都是内部封装好的)
  3. 2.锁只在处理数据的部分加来保证数据安全(只在争抢数据的环节加锁处理即可)
  1. from multiprocessing import Process, Lock
  2. import json
  3. import time
  4. import random
  5. # 查票
  6. def search(i):
  7. # 文件操作读取票数
  8. with open('data', 'r', encoding='utf8') as f:
  9. dic = json.load(f)
  10. print('用户%s查询余票:%s'%(i, dic.get('ticket_num')))
  11. # 字典取值不要用[]的形式 推荐使用get 你写的代码打死都不能报错!!!
  12. # 买票 1.先查 2.再买
  13. def buy(i):
  14. # 先查票
  15. with open('data', 'r', encoding='utf8') as f:
  16. dic = json.load(f)
  17. # 模拟网络延迟
  18. time.sleep(random.randint(1,3))
  19. # 判断当前是否有票
  20. if dic.get('ticket_num') > 0:
  21. # 修改数据库 买票
  22. dic['ticket_num'] -= 1
  23. # 写入数据库
  24. with open('data', 'w', encoding='utf8') as f:
  25. json.dump(dic,f)
  26. print('用户%s买票成功'%i)
  27. else:
  28. print('用户%s买票失败'%i)
  29. # 整合上面两个函数
  30. def run(i, mutex):
  31. search(i)
  32. # 给买票环节加锁处理
  33. # 抢锁
  34. mutex.acquire()
  35. buy(i)
  36. # 释放锁
  37. mutex.release()
  38. if __name__ == '__main__':
  39. # 在主进程中生成一把锁 让所有的子进程抢 谁先抢到谁先买票
  40. mutex = Lock()
  41. for i in range(1,11):
  42. p = Process(target=run, args=(i, mutex))
  43. p.start()

进程队列 Queue

  1. 队列:先进先出
  2. 堆栈:先进后出
  1. from multiprocessing import Queue
  2. q = Queue() # 括号内可以放数字来限制队列的大小
  3. q.put() # 放数据 当队列满了再放 阻塞
  4. q.get() # 取数据 当队列空了再取 阻塞
  5. q.full() # 判断队列是否满了
  6. q.empty() # 判断队列是否空了
  7. q.get_nowait() # 取数据的时候如果没有数据直接报错
  8. q.get(timeout=5) # 取数据的时候如果没有数据等5s还没有则直接报错
  1. try:
  2. v6 = q.get(timeout=3)
  3. print(v6)
  4. except Exception as e:
  5. print('一滴都没有了!')
  1. from multiprocessing import Queue
  2. # 创建一个队列
  3. q = Queue(5) # 括号内可以传数字 标示生成的队列最大可以同时存放的数据量
  4. # 往队列中存数据
  5. q.put(111)
  6. q.put(222)
  7. q.put(333)
  8. # print(q.full()) # 判断当前队列是否满了
  9. # print(q.empty()) # 判断当前队列是否空了
  10. q.put(444)
  11. q.put(555)
  12. # print(q.full()) # 判断当前队列是否满了
  13. # q.put(666) # 当队列数据放满了之后 如果还有数据要放程序会阻塞 直到有位置让出来 不会报错
  14. """
  15. 存取数据 存是为了更好的取
  16. 千方百计的存、简单快捷的取
  17. 同在一个屋檐下
  18. 差距为何那么大
  19. """
  20. # 去队列中取数据
  21. v1 = q.get()
  22. v2 = q.get()
  23. v3 = q.get()
  24. v4 = q.get()
  25. v5 = q.get()
  26. # print(q.empty())
  27. # V6 = q.get_nowait() # 没有数据直接报错queue.Empty
  28. # v6 = q.get(timeout=3) # 没有数据之后原地等待三秒之后再报错 queue.Empty
  29. try:
  30. v6 = q.get(timeout=3)
  31. print(v6)
  32. except Exception as e:
  33. print('一滴都没有了!')
  34. # # v6 = q.get() # 队列中如果已经没有数据的话 get方法会原地阻塞
  35. # print(v1, v2, v3, v4, v5, v6)
  36. """
  37. q.full()
  38. q.empty()
  39. q.get_nowait()
  40. 在多进程的情况下是不精确
  41. """

IPC机制-进程+队列

进程间通信

  1. """
  2. 进程之间是无法直接进行数据交互的,但是可以通过队列或者管道实现数据交互
  3. 管道:
  4. 队列:管道+锁
  5. 本地测试的时候才可能会用到Queue,实际生产用的都是别人封装好的功能非常强大的工具
  6. redis
  7. kafka
  8. RQ
  9. """
  1. from multiprocessing import Queue, Process
  2. """
  3. 研究思路
  4. 1.主进程跟子进程借助于队列通信
  5. 2.子进程跟子进程借助于队列通信
  6. """
  7. def producer(q):
  8. q.put('我是23号技师 很高兴为您服务')
  9. def consumer(q):
  10. print(q.get())
  11. if __name__ == '__main__':
  12. q = Queue() #创建一个队列
  13. p = Process(target=producer,args=(q,)) #将q队列传入,调用put方法
  14. p1 = Process(target=consumer,args=(q,)) #将q队列传入,调用get方法
  15. p.start()
  16. p1.start()

生产消费者模型

  1. """
  2. 生产者:生产/制造东西的
  3. 消费者:消费/处理东西的
  4. 该模型除了上述两个之外还需要一个媒介
  5. 生活中的例子做包子的将包子做好后放在蒸笼(媒介)里面,买包子的取蒸笼里面拿
  6. 厨师做菜做完之后用盘子装着给你消费者端过去
  7. 生产者和消费者之间不是直接做交互的,而是借助于媒介做交互
  8. 生产者(做包子的) + 消息队列(蒸笼) + 消费者(吃包子的)
  9. """
  1. from multiprocessing import Process, Queue, JoinableQueue
  2. import time
  3. import random
  4. def producer(name,food,q):
  5. for i in range(5):
  6. data = '%s生产了%s%s'%(name,food,i)
  7. # 模拟延迟
  8. time.sleep(random.randint(1,3))
  9. print(data)
  10. # 将数据放入 队列中
  11. q.put(data)
  12. def consumer(name,q):
  13. # 消费者胃口很大 光盘行动
  14. while True:
  15. food = q.get() # 没有数据就会卡住
  16. # 判断当前是否有结束的标识
  17. # if food is None:break
  18. time.sleep(random.randint(1,3))
  19. print('%s吃了%s'%(name,food))
  20. q.task_done() # 告诉队列你已经从里面取出了一个数据并且处理完毕了
  21. if __name__ == '__main__':
  22. # q = Queue()
  23. q = JoinableQueue() #每当你往该队列中存入数据的时候 内部会有一个计数器+1
  24. p1 = Process(target=producer,args=('大厨egon','包子',q))
  25. p2 = Process(target=producer,args=('马叉虫tank','泔水',q))
  26. #消费者
  27. c1 = Process(target=consumer,args=('春哥',q))
  28. c2 = Process(target=consumer,args=('新哥',q))
  29. p1.start()
  30. p2.start()
  31. # 将消费者设置成守护进程
  32. c1.daemon = True
  33. c2.daemon = True
  34. c1.start() #开始消费
  35. c2.start()
  36. p1.join()
  37. p2.join()
  38. # 等待生产者生产完毕之后 往队列中添加特定的结束符号
  39. # q.put(None) # 肯定在所有生产者生产的数据的末尾
  40. q.join() # 等待队列中所有的数据被取完再执行往下执行代码
  41. """
  42. JoinableQueue 每当你往该队列中存入数据的时候 内部会有一个计数器+1
  43. 没当你调用task_done的时候 计数器-1
  44. q.join() 当计数器为0的时候 才往后运行
  45. """
  46. # 只要q.join执行完毕 说明消费者已经处理完数据了 消费者就没有存在的必要了

线程

致命三问

  • 什么是线程 ```css “”” 进程:资源单位 线程:执行单位

将操作系统比喻成一个大的工厂 那么进程就相当于工厂里面的车间 而线程就是车间里面的流水线

每一个进程肯定自带一个线程

再次总结: 进程:资源单位(起一个进程仅仅只是在内存空间中开辟一块独立的空间) 线程:执行单位(真正被cpu执行的其实是进程里面的线程,线程指的就是代码的执行过程,执行代码中所需要使用到的资源都找所在的进程索要)

进程和线程都是虚拟单位,只是为了我们更加方便的描述问题 “””

  1. -
  2. 为何要有线程
  3. ```python
  4. """
  5. 开设进程
  6. 1.申请内存空间 耗资源
  7. 2.“拷贝代码” 耗资源
  8. 开线程
  9. 一个进程内可以开设多个线程,在用一个进程内开设多个线程无需再次申请内存空间操作
  10. 总结:
  11. 开设线程的开销要远远的小于进程的开销
  12. 同一个进程下的多个线程数据是共享的!!!
  13. """
  14. 我们要开发一款文本编辑器
  15. 获取用户输入的功能
  16. 实时展示到屏幕的功能
  17. 自动保存到硬盘的功能
  18. 针对上面这三个功能,开设进程还是线程合适???
  19. 开三个线程处理上面的三个功能更加的合理
  • 如何使用
  • 线程理论

  1. """
  2. 进程:资源单位
  3. 线程:执行单位
  4. 线程才是真正干活的人,干活的过程中需要的资源由线程所在的进程提供
  5. 每一个进程肯定都自带一个线程
  6. 同一个进程内可以创建多个线程
  7. """
  8. """
  9. 开进程
  10. 申请内存空间
  11. ”拷贝代码“
  12. 消耗资源较大
  13. 开线程
  14. 同一个进程内创建多个线程 无需上述两部操作,消耗资源相对较小
  15. 智商
  16. 情商
  17. 搜商
  18. """

今日内容概要

  • 开启线程的两种方式
  • TCP服务端实现并发的效果
  • 线程对象的join方法
  • 线程间数据共享
  • 线程对象属性及其他方法
  • 守护线程
  • 线程互斥锁
  • GIL全局解释器锁
  • 多进程与多线程的实际应用场景

开启线程的两种方式

与开启进程方式类似,直接使用thread创建或者类继承

  1. # from multiprocessing import Process
  2. # from threading import Thread
  3. # import time
  4. #
  5. #
  6. # def task(name):
  7. # print('%s is running'%name)
  8. # time.sleep(1)
  9. # print('%s is over'%name)
  10. #
  11. #
  12. # # 开启线程不需要在main下面执行代码 直接书写就可以
  13. # # 但是我们还是习惯性的将启动命令写在main下面
  14. # t = Thread(target=task,args=('egon',))
  15. # # p = Process(target=task,args=('jason',))
  16. # # p.start()
  17. # t.start() # 创建线程的开销非常小 几乎是代码一执行线程就已经创建了
  18. # print('主')
  19. from threading import Thread
  20. import time
  21. class MyThead(Thread):
  22. def __init__(self, name):
  23. """针对刷个下划线开头双下滑线结尾(__init__)的方法 统一读成 双下init"""
  24. # 重写了别人的方法 又不知道别人的方法里有啥 你就调用父类的方法
  25. super().__init__()
  26. self.name = name
  27. def run(self):
  28. print('%s is running'%self.name)
  29. time.sleep(1)
  30. print('egon DSB')
  31. if __name__ == '__main__':
  32. t = MyThead('egon')
  33. t.start()
  34. print('主')

TCP服务端实现并发

  1. import socket
  2. from threading import Thread
  3. from multiprocessing import Process
  4. """
  5. 服务端
  6. 1.要有固定的IP和PORT
  7. 2.24小时不间断提供服务
  8. 3.能够支持并发
  9. 从现在开始要养成一个看源码的习惯
  10. 我们前期要立志称为拷贝忍者 卡卡西 不需要有任何的创新
  11. 等你拷贝到一定程度了 就可以开发自己的思想了
  12. """
  13. server =socket.socket() # 括号内不加参数默认就是TCP协议
  14. server.bind(('127.0.0.1',8080))
  15. server.listen(5)
  16. # 将服务的代码单独封装成一个函数
  17. def talk(conn):
  18. # 通信循环
  19. while True:
  20. try:
  21. data = conn.recv(1024)
  22. # 针对mac linux 客户端断开链接后
  23. if len(data) == 0: break
  24. print(data.decode('utf-8'))
  25. conn.send(data.upper())
  26. except ConnectionResetError as e:
  27. print(e)
  28. break
  29. conn.close()
  30. # 链接循环
  31. while True:
  32. conn, addr = server.accept() # 接客
  33. # 叫其他人来服务客户
  34. # t = Thread(target=talk,args=(conn,))
  35. t = Process(target=talk,args=(conn,))
  36. t.start()
  37. """客户端"""
  38. import socket
  39. client = socket.socket()
  40. client.connect(('127.0.0.1',8080))
  41. while True:
  42. client.send(b'hello world')
  43. data = client.recv(1024)
  44. print(data.decode('utf-8'))

线程对象的join方法

  1. from threading import Thread
  2. import time
  3. def task(name):
  4. print('%s is running'%name)
  5. time.sleep(3)
  6. print('%s is over'%name)
  7. if __name__ == '__main__':
  8. t = Thread(target=task,args=('egon',))
  9. t.start()
  10. t.join() # 主线程等待子线程运行结束再执行
  11. print('主')

同一个进程下的多个线程数据是共享的

  1. from threading import Thread
  2. import time
  3. money = 100
  4. def task():
  5. global money
  6. money = 666
  7. print(money)
  8. if __name__ == '__main__':
  9. t = Thread(target=task)
  10. t.start()
  11. t.join()
  12. print(money)

线程对象属性及其他方法

  1. print('主',active_count()) # 统计当前正在活跃的线程数
  2. print('主',os.getpid())
  3. print('主',current_thread().name) # 获取线程名字
  1. from threading import Thread, active_count, current_thread
  2. import os,time
  3. def task(n):
  4. # print('hello world',os.getpid())
  5. print('hello world',current_thread().name)
  6. time.sleep(n)
  7. if __name__ == '__main__':
  8. t = Thread(target=task,args=(1,))
  9. t1 = Thread(target=task,args=(2,))
  10. t.start()
  11. t1.start()
  12. t.join()
  13. print('主',active_count()) # 统计当前正在活跃的线程数
  14. # print('主',os.getpid())
  15. # print('主',current_thread().name) # 获取线程名字

守护线程

2.1、什么是守护线程?

  1. **1、*_守护线程会在"该进程内所有非守护线程全部都运行完毕后,守护线程才会挂掉"。并不是主线程运行完毕后守护线程挂掉。这一点是和守护进程的区别之处!*_**
  2. **_*2、*_*_守护线程守护的是:当前进程内所有的子线程!*_<br />

**

  1. **3、主线程在其他非守护线程运行完毕后才算运行完毕(守护线程在此时就被回收)。因为主线程的结束意味着进程的结束,进程整体的资源都将被回收,而进程必须保证非守护线程都运行完毕后才能结束。**

t = Thread(target=task,args=(‘egon’,))

t.daemon = True

  1. 1.例子:当只有一个子线程并且为守护线程,那么这个守护线程就会等待主线程运行完毕后挂掉
  2. # from threading import Thread
  3. # import time
  4. #
  5. #
  6. # def task(name):
  7. # print('%s is running'%name)
  8. # time.sleep(1)
  9. # print('%s is over'%name)
  10. #
  11. #
  12. # if __name__ == '__main__':
  13. # t = Thread(target=task,args=('egon',))
  14. # t.daemon = True
  15. # t.start()
  16. # print('主')
  17. """
  18. 主线程运行结束之后不会立刻结束 会等待所有其他非守护线程结束才会结束
  19. 因为主线程的结束意味着所在的进程的结束
  20. """
  21. 2.例子:当有多个子线程时,守护线程就会等待所有的子线程运行完毕后,守护线程才会挂掉
  22. (这一点和主线程是一样的,都是等待所有的子线程运行完毕后才会挂掉)。
  23. from threading import Thread
  24. import time
  25. def foo():
  26. print(123)
  27. time.sleep(1)
  28. print('end123')
  29. def func():
  30. print(456)
  31. time.sleep(3)
  32. print('end456')
  33. if __name__ == '__main__':
  34. t1 = Thread(target=foo)
  35. t2 = Thread(target=func)
  36. t1.daemon = True
  37. t1.start()
  38. t2.start()
  39. print('主.......')

线程互斥锁

与进程类似

  1. from threading import Thread,Lock
  2. import time
  3. money = 100
  4. mutex = Lock()
  5. def task():
  6. global money
  7. mutex.acquire()
  8. tmp = money
  9. time.sleep(0.1)
  10. money = tmp - 1
  11. mutex.release()
  12. if __name__ == '__main__':
  13. t_list = []
  14. for i in range(100):
  15. t = Thread(target=task)
  16. t.start()
  17. t_list.append(t)
  18. for t in t_list:
  19. t.join()
  20. print(money)

GIL全局解释器锁

Ps:博客园密码:xiaoyuanqujing@666

  1. """
  2. In CPython, the global interpreter lock, or GIL, is a mutex that prevents multiple
  3. native threads from executing Python bytecodes at once. This lock is necessary mainly
  4. because CPython’s memory management is not thread-safe. (However, since the GIL
  5. exists, other features have grown to depend on the guarantees that it enforces.)
  6. """
  7. """
  8. python解释器其实有多个版本
  9. Cpython
  10. Jpython
  11. Pypypython
  12. 但是普遍使用的都是CPython解释器
  13. 在CPython解释器中GIL是一把互斥锁,用来阻止同一个进程下的多个线程的同时执行
  14. 同一个进程下的多个线程无法利用多核优势!!!
  15. 疑问:python的多线程是不是一点用都没有???无法利用多核优势
  16. 因为cpython中的内存管理不是线程安全的
  17. 内存管理(垃圾回收机制)
  18. 1.应用计数
  19. 2.标记清楚
  20. 3.分代回收
  21. """
  22. """
  23. 重点:
  24. 1.GIL不是python的特点而是CPython解释器的特点
  25. 2.GIL是保证解释器级别的数据的安全
  26. 3.GIL会导致同一个进程下的多个线程的无法同时执行即无法利用多核优势(******)
  27. 4.针对不同的数据还是需要加不同的锁处理
  28. 5.解释型语言的通病:同一个进程下多个线程无法利用多核优势
  29. """

GIL与普通互斥锁的区别

  1. from threading import Thread,Lock
  2. import time
  3. mutex = Lock()
  4. money = 100
  5. def task():
  6. global money
  7. # with mutex:
  8. # tmp = money
  9. # time.sleep(0.1)
  10. # money = tmp -1
  11. mutex.acquire()
  12. tmp = money
  13. time.sleep(0.1) # 只要你进入IO了 GIL会自动释放,但是别人没拿到money锁,gil会回来
  14. money = tmp - 1
  15. mutex.release()
  16. if __name__ == '__main__':
  17. t_list = []
  18. for i in range(100):
  19. t = Thread(target=task)
  20. t.start()
  21. t_list.append(t)
  22. for t in t_list:
  23. t.join()
  24. print(money)
  25. """
  26. 100个线程起起来之后 要先去抢GIL
  27. 我进入io GIL自动释放 但是我手上还有一个自己的互斥锁
  28. 其他线程虽然抢到了GIL但是抢不到互斥锁
  29. 最终GIL还是回到你的手上 你去操作数据
  30. """

同一个进程下的多线程无法利用多核优势,是不是就没有用了

  1. """
  2. 多线程是否有用要看具体情况
  3. 单核:四个任务(IO密集型\计算密集型)
  4. 多核:四个任务(IO密集型\计算密集型)
  5. """
  6. # 计算密集型 每个任务都需要10s
  7. 单核(不用考虑了)
  8. 多进程:额外的消耗资源
  9. 多线程:介绍开销
  10. 多核
  11. 多进程:总耗时 10+
  12. 多线程:总耗时 40+
  13. # IO密集型
  14. 多核
  15. 多进程:相对浪费资源
  16. 多线程:更加节省资源

代码验证-针对io\cpu密集

  1. # 计算密集型
  2. # from multiprocessing import Process
  3. # from threading import Thread
  4. # import os,time
  5. #
  6. #
  7. # def work():
  8. # res = 0
  9. # for i in range(10000000):
  10. # res *= i
  11. #
  12. # if __name__ == '__main__':
  13. # l = []
  14. # print(os.cpu_count()) # 获取当前计算机CPU个数
  15. # start_time = time.time()
  16. # for i in range(12):
  17. # p = Process(target=work) # 1.4679949283599854
  18. # t = Thread(target=work) # 5.698534250259399
  19. # t.start()
  20. # # p.start()
  21. # # l.append(p)
  22. # l.append(t)
  23. # for p in l:
  24. # p.join()
  25. # print(time.time()-start_time)
  26. # IO密集型
  27. from multiprocessing import Process
  28. from threading import Thread
  29. import os,time
  30. def work():
  31. time.sleep(2)
  32. if __name__ == '__main__':
  33. l = []
  34. print(os.cpu_count()) # 获取当前计算机CPU个数
  35. start_time = time.time()
  36. for i in range(4000):
  37. # p = Process(target=work) # 21.149890184402466
  38. t = Thread(target=work) # 3.007986068725586
  39. t.start()
  40. # p.start()
  41. # l.append(p)
  42. l.append(t)
  43. for p in l:
  44. p.join()
  45. print(time.time()-start_time)

总结

  1. """
  2. 多进程和多线程都有各自的优势
  3. 并且我们后面在写项目的时候通常可以
  4. 多进程下面再开设多线程
  5. 这样的话既可以利用多核也可以介绍资源消耗
  6. """

TCP服务端并发

简单实现多线程tcp

  1. import socket
  2. from threading import Thread
  3. def communication(conn):
  4. while True:
  5. try:
  6. data = conn.recv(1024)
  7. if len(data) == 0: break
  8. conn.send(data.upper())
  9. except ConnectionResetError as e:
  10. print(e)
  11. break
  12. conn.close()
  13. def server(ip, port):
  14. server = socket.socket()
  15. server.bind((ip, port))
  16. server.listen(5)
  17. while True:
  18. conn, addr = server.accept()
  19. # 开设多进程或者多线程处理客户端通信
  20. t = Thread(target=communication, args=(conn,)) #没接收到一个连接则创建一个线程
  21. t.start()
  22. if __name__ == '__main__':
  23. s = Thread(target=server, args=('127.0.0.1', 8080))
  24. s.start()

今日内容概要

  • 死锁与递归锁(了解)
  • 信号量(了解)
  • Event事件(了解)
  • 线程q(了解)
  • 进程池与线程池(掌握)
  • 协程(了解)
  • 协程实现TCP服务端的并发效果(了解)

死锁

当你知道锁的使用抢锁必须要释放锁,其实你在操作锁的时候也极其容易产生死锁现象(整个程序卡死 阻塞)

  1. from threading import Thread, Lock
  2. import time
  3. mutexA = Lock()
  4. mutexB = Lock()
  5. # 类只要加括号多次 产生的肯定是不同的对象
  6. # 如果你想要实现多次加括号等到的是相同的对象 单例模式
  7. class MyThead(Thread):
  8. def run(self):
  9. self.func1()
  10. self.func2()
  11. def func1(self):
  12. mutexA.acquire() #抢A锁
  13. print('%s 抢到A锁'% self.name) # 获取当前线程名
  14. mutexB.acquire() #等func2释放B锁,再抢,阻塞在这里
  15. print('%s 抢到B锁'% self.name)
  16. mutexB.release()
  17. mutexA.release()
  18. def func2(self):
  19. mutexB.acquire() #抢B锁
  20. print('%s 抢到B锁'% self.name)
  21. time.sleep(2)
  22. mutexA.acquire() #等func1释放A锁,再抢,阻塞在这里
  23. print('%s 抢到A锁'% self.name) # 获取当前线程名
  24. mutexA.release()
  25. mutexB.release()
  26. if __name__ == '__main__':
  27. for i in range(10):
  28. t = MyThead()
  29. t.start()

递归锁 RLOCK

  1. """
  2. 递归锁的特点
  3. 可以被连续的acquire和release
  4. 但是只能被第一个抢到这把锁执行上述操作
  5. 它的内部有一个计数器 每acquire一次计数加一 每realse一次计数减一
  6. 只要计数不为0 那么其他人都无法抢到该锁
  7. """
  8. 死锁解决方法:
  9. 递归锁,在Python中为了支持在同一线程中多次请求同一资源,python提供了可重入锁RLock
  10. 这个RLock内部维护着一个Lock和一个counter变量,counter记录了acquire的次数,从而使得资源可以被多次require
  11. 直到一个线程所有的acquire都被release,其他的线程才能获得资源。上面的例子如果使用RLock代替Lock,则不会发生死锁:
  12. # 类只要加括号多次 产生的肯定是不同的对象
  13. # 如果你想要实现多次加括号等到的是相同的对象 单例模式
  14. # 将上述的
  15. mutexA = Lock()
  16. mutexB = Lock()
  17. # 换成
  18. mutexA = mutexB = RLock()

信号量 Semaphore

信号量在不同的阶段可能对应不同的技术点

在并发编程中信号量指的是锁!!!

  1. """
  2. 如果我们将互斥锁比喻成一个厕所的话
  3. 那么信号量就相当于多个厕所
  4. """
  5. from threading import Thread, Semaphore
  6. import time
  7. import random
  8. """
  9. 利用random模块实现打印随机验证码(搜狗的一道笔试题)
  10. """
  11. sm = Semaphore(5) # 括号内写数字 写几就表示开设几个坑位
  12. def task(name):
  13. sm.acquire() #最多可以获取到5把锁,其他的等5把释放后再获取锁,否则会阻塞在这里
  14. print('%s 正在蹲坑'% name)
  15. time.sleep(random.randint(1, 5))
  16. sm.release()
  17. if __name__ == '__main__':
  18. for i in range(20): #会运行thread20次
  19. t = Thread(target=task, args=('伞兵%s号'%i, ))
  20. t.start()

Event事件

一些进程/线程需要等待另外一些进程/线程运行完毕之后才能运行,类似于发射信号一样

event = Event() # 造了一个红绿灯

event.set() #运行到,wait开始执行

event.wait() # 等待别人给你发信号

  1. from threading import Thread, Event
  2. import time
  3. event = Event() # 造了一个红绿灯
  4. def light():
  5. print('红灯亮着的')
  6. time.sleep(3)
  7. print('绿灯亮了')
  8. # 告诉等待红灯的人可以走了
  9. event.set()
  10. def car(name):
  11. print('%s 车正在灯红灯'%name)
  12. event.wait() # 等待别人给你发信号
  13. print('%s 车加油门飙车走了'%name)
  14. if __name__ == '__main__':
  15. t = Thread(target=light)
  16. t.start()
  17. for i in range(20):
  18. t = Thread(target=car, args=('%s'%i, ))
  19. t.start()

线程队列 queue

  1. """
  2. 同一个进程下多个线程数据是共享的
  3. 为什么先同一个进程下还会去使用队列呢
  4. 因为队列是
  5. 管道 + 锁
  6. 所以用队列还是为了保证数据的安全
  7. """
  8. import queue #进程使用的是Queue
  9. # 我们现在使用的队列都是只能在本地测试使用
  10. # 1 队列q 先进先出
  11. # q = queue.Queue(3)
  12. # q.put(1)
  13. # q.get()
  14. # q.get_nowait()
  15. # q.get(timeout=3)
  16. # q.full()
  17. # q.empty()
  18. # 后进先出q
  19. # q = queue.LifoQueue(3) # last in first out
  20. # q.put(1)
  21. # q.put(2)
  22. # q.put(3)
  23. # print(q.get()) # 3
  24. # 优先级q 你可以给放入队列中的数据设置进出的优先级
  25. q = queue.PriorityQueue(4)
  26. q.put((10, '111'))
  27. q.put((100, '222'))
  28. q.put((0, '333'))
  29. q.put((-5, '444'))
  30. print(q.get()) # (-5, '444')
  31. # put括号内放一个元祖 第一个放数字表示优先级
  32. # 需要注意的是 数字越小优先级越高!!!

进程池与线程池

先回顾之前TCP服务端实现并发的效果是怎么玩的

每来一个人就开设一个进程或者线程去处理

  1. """
  2. 无论是开设进程也好还是开设线程也好 是不是都需要消耗资源
  3. 只不过开设线程的消耗比开设进程的稍微小一点而已
  4. 我们是不可能做到无限制的开设进程和线程的 因为计算机硬件的资源更不上!!!
  5. 硬件的开发速度远远赶不上软件呐
  6. 我们的宗旨应该是在保证计算机硬件能够正常工作的情况下最大限度的利用它
  7. """
  8. # 池的概念
  9. """
  10. 什么是池?
  11. 池是用来保证计算机硬件安全的情况下最大限度的利用计算机
  12. 它降低了程序的运行效率但是保证了计算机硬件的安全 从而让你写的程序能够正常运行
  13. """

基本使用

  1. from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
  2. import time
  3. import os
  4. # pool = ThreadPoolExecutor(5) # 池子里面固定只有五个线程
  5. # 括号内可以传数字 不传的话默认会开设当前计算机cpu个数五倍的线程
  6. pool = ProcessPoolExecutor(5)
  7. # 括号内可以传数字 不传的话默认会开设当前计算机cpu个数进程
  8. """
  9. 池子造出来之后 里面会固定存在五个线程
  10. 这个五个线程不会出现重复创建和销毁的过程
  11. 池子造出来之后 里面会固定的几个进程
  12. 这个几个进程不会出现重复创建和销毁的过程
  13. 池子的使用非常的简单
  14. 你只需要将需要做的任务往池子中提交即可 自动会有人来服务你
  15. """
  16. def task(n):
  17. print(n,os.getpid())
  18. time.sleep(2)
  19. return n**n
  20. def call_back(n):
  21. print('call_back>>>:',n.result())
  22. """
  23. 任务的提交方式
  24. 同步:提交任务之后原地等待任务的返回结果 期间不做任何事
  25. 异步:提交任务之后不等待任务的返回结果 执行继续往下执行
  26. 返回结果如何获取???
  27. 异步提交任务的返回结果 应该通过回调机制来获取
  28. 回调机制
  29. 就相当于给每个异步任务绑定了一个定时炸弹
  30. 一旦该任务有结果立刻触发爆炸
  31. """
  32. if __name__ == '__main__':
  33. # pool.submit(task, 1) # 朝池子中提交任务 异步提交
  34. # print('主')
  35. t_list = []
  36. for i in range(20): # 朝池子中提交20个任务
  37. # res = pool.submit(task, i) # <Future at 0x100f97b38 state=running>
  38. res = pool.submit(task, i).add_done_callback(call_back) #提交一个任务并绑定回调函数
  39. # print(res.result()) # result方法 同步提交
  40. # t_list.append(res)
  41. # 等待线程池中所有的任务执行完毕之后再继续往下执行
  42. # pool.shutdown() # 关闭线程池 等待线程池中所有的任务运行完毕
  43. # for t in t_list:
  44. # print('>>>:',t.result()) # 肯定是有序的
  45. """
  46. 程序有并发变成了串行
  47. 任务的为什么打印的是None
  48. res.result() 拿到的就是异步提交的任务的返回结果
  49. """

总结

  1. from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
  2. pool = ProcessPoolExecutor(5)
  3. pool.submit(task, i).add_done_callback(call_back)

协程

  1. """
  2. 进程:资源单位
  3. 线程:执行单位
  4. 协程:这个概念完全是程序员自己意淫出来的 根本不存在
  5. 单线程下实现并发
  6. 我们程序员自己再代码层面上检测我们所有的IO操作
  7. 一旦遇到IO了 我们在代码级别完成切换
  8. 这样给CPU的感觉是你这个程序一直在运行 没有IO
  9. 从而提升程序的运行效率
  10. 多道技术
  11. 切换+保存状态
  12. CPU两种切换
  13. 1.程序遇到IO
  14. 2.程序长时间占用
  15. TCP服务端
  16. accept
  17. recv
  18. 代码如何做到
  19. 切换+保存状态
  20. 切换
  21. 切换不一定是提升效率 也有可能是降低效率
  22. IO切 提升
  23. 没有IO切 降低
  24. 保存状态
  25. 保存上一次我执行的状态 下一次来接着上一次的操作继续往后执行
  26. yield
  27. """

验证协程切换是否就一定提升效率

协程切换并不串行快,如果有io再切换会更快

  1. # import time
  2. #
  3. # # 串行执行计算密集型的任务 用时1.2372429370880127
  4. # def func1():
  5. # for i in range(10000000):
  6. # i + 1
  7. #
  8. # def func2():
  9. # for i in range(10000000):
  10. # i + 1
  11. #
  12. # start_time = time.time()
  13. # func1()
  14. # func2()
  15. # print(time.time() - start_time)
  16. # 切换 + yield 用时:2.1247239112854004
  17. # import time
  18. #
  19. #
  20. # def func1():
  21. # while True:
  22. # 10000000 + 1
  23. # yield
  24. #
  25. #
  26. # def func2():
  27. # g = func1() # 先初始化出生成器
  28. # for i in range(10000000):
  29. # i + 1
  30. # next(g) #执行下一次func1
  31. #
  32. # start_time = time.time()
  33. # func2()
  34. # print(time.time() - start_time)

gevent 检测io

安装

  1. pip3 install gevent
  2. from gevent import monkey;monkey.patch_all()
  3. import time
  4. from gevent import spawn
  5. """
  6. gevent模块本身无法检测常见的一些io操作
  7. 在使用的时候需要你额外的导入一句话
  8. from gevent import monkey
  9. monkey.patch_all()
  10. 又由于上面的两句话在使用gevent模块的时候是肯定要导入的
  11. 所以还支持简写
  12. from gevent import monkey;monkey.patch_all()
  13. """
  14. def heng():
  15. print('哼')
  16. time.sleep(2)
  17. print('哼')
  18. def ha():
  19. print('哈')
  20. time.sleep(3)
  21. print('哈')
  22. def heiheihei():
  23. print('heiheihei')
  24. time.sleep(5)
  25. print('heiheihei')
  26. start_time = time.time()
  27. g1 = spawn(heng) #创建一个普通的Greenlet对象并切换;遇到io就切换
  28. g2 = spawn(ha)
  29. g3 = spawn(heiheihei)
  30. g1.join()
  31. g2.join() # 等待被检测的任务执行完毕 再往后继续执行
  32. g3.join()
  33. # heng()
  34. # ha()
  35. # print(time.time() - start_time) # 5.005702018737793
  36. print(time.time() - start_time) # 3.004199981689453 5.005439043045044

协程实现TCP服务端的并发

  1. # 服务端
  2. from gevent import monkey;monkey.patch_all()
  3. import socket
  4. from gevent import spawn
  5. def communication(conn):
  6. while True:
  7. try:
  8. data = conn.recv(1024)
  9. if len(data) == 0: break
  10. conn.send(data.upper())
  11. except ConnectionResetError as e:
  12. print(e)
  13. break
  14. conn.close()
  15. def server(ip, port):
  16. server = socket.socket()
  17. server.bind((ip, port))
  18. server.listen(5)
  19. while True:
  20. conn, addr = server.accept()
  21. spawn(communication, conn) #comunication是函数,conn是函数参数
  22. if __name__ == '__main__':
  23. g1 = spawn(server, '127.0.0.1', 8080)
  24. g1.join()
  25. # 客户端
  26. from threading import Thread, current_thread
  27. import socket
  28. def x_client():
  29. client = socket.socket()
  30. client.connect(('127.0.0.1',8080))
  31. n = 0
  32. while True:
  33. msg = '%s say hello %s'%(current_thread().name,n)
  34. n += 1
  35. client.send(msg.encode('utf-8'))
  36. data = client.recv(1024)
  37. print(data.decode('utf-8'))
  38. if __name__ == '__main__':
  39. for i in range(500):
  40. t = Thread(target=x_client)
  41. t.start()

总结

  1. """
  2. 理想状态:
  3. 我们可以通过
  4. 多进程下面开设多线程
  5. 多线程下面再开设协程序
  6. 从而使我们的程序执行效率提升
  7. """

IO模型

IO模型简介

  1. """
  2. 我们这里研究的IO模型都是针对网络IO的
  3. Stevens在文章中一共比较了五种IO Model:
  4. * blocking IO 阻塞IO
  5. * nonblocking IO 非阻塞IO
  6. * IO multiplexing IO多路复用
  7. * signal driven IO 信号驱动IO
  8. * asynchronous IO 异步IO
  9. 由signal driven IO(信号驱动IO)在实际中并不常用,所以主要介绍其余四种IO Model。
  10. """
  11. #1)等待数据准备 (Waiting for the data to be ready)
  12. #2)将数据从内核拷贝到进程中(Copying the data from the kernel to the process)
  13. 同步异步
  14. 阻塞非阻塞
  15. 常见的网络阻塞状态:
  16. accept
  17. recv
  18. recvfrom
  19. send虽然它也有io行为 但是不在我们的考虑范围

阻塞IO模型

  1. """
  2. 我们之前写的都是阻塞IO模型 协程除外
  3. """
  4. import socket
  5. server = socket.socket()
  6. server.bind(('127.0.0.1',8080))
  7. server.listen(5)
  8. while True:
  9. conn, addr = server.accept()
  10. while True:
  11. try:
  12. data = conn.recv(1024)
  13. if len(data) == 0:break
  14. print(data)
  15. conn.send(data.upper())
  16. except ConnectionResetError as e:
  17. break
  18. conn.close()
  19. # 在服务端开设多进程或者多线程 进程池线程池 其实还是没有解决IO问题
  20. 该等的地方还是得等 没有规避
  21. 只不过多个人等待的彼此互不干扰

非阻塞IO setblocking

需要不断循环遍历conn连接对象

  1. """
  2. 要自己实现一个非阻塞IO模型
  3. """
  4. import socket
  5. import time
  6. server = socket.socket()
  7. server.bind(('127.0.0.1', 8081))
  8. server.listen(5)
  9. server.setblocking(False)
  10. # 将所有的网络阻塞变为非阻塞
  11. r_list = []
  12. del_list = []
  13. while True:
  14. try:
  15. conn, addr = server.accept()
  16. r_list.append(conn)
  17. except BlockingIOError:
  18. # time.sleep(0.1)
  19. # print('列表的长度:',len(r_list))
  20. # print('做其他事')
  21. for conn in r_list:
  22. try:
  23. data = conn.recv(1024) # 没有消息 报错
  24. if len(data) == 0: # 客户端断开链接
  25. conn.close() # 关闭conn
  26. # 将无用的conn从r_list删除
  27. del_list.append(conn)
  28. continue
  29. conn.send(data.upper())
  30. except BlockingIOError:
  31. continue
  32. except ConnectionResetError:
  33. conn.close()
  34. del_list.append(conn)
  35. # 挥手无用的链接
  36. for conn in del_list:
  37. r_list.remove(conn)
  38. del_list.clear()
  39. # 客户端
  40. import socket
  41. client = socket.socket()
  42. client.connect(('127.0.0.1',8081))
  43. while True:
  44. client.send(b'hello world')
  45. data = client.recv(1024)
  46. print(data)

总结

  1. """
  2. 虽然非阻塞IO给你的感觉非常的牛逼
  3. 但是该模型会 长时间占用着CPU并且不干活 让CPU不停的空转
  4. 我们实际应用中也不会考虑使用非阻塞IO模型
  5. 任何的技术点都有它存在的意义
  6. 实际应用或者是思想借鉴
  7. """

IO多路复用 select

  1. """
  2. 当监管的对象只有一个的时候 其实IO多路复用连阻塞IO都比比不上!!!
  3. 但是IO多路复用可以一次性监管很多个对象
  4. server = socket.socket()
  5. conn,addr = server.accept()
  6. 监管机制是操作系统本身就有的 如果你想要用该监管机制(select)
  7. 需要你导入对应的select模块
  8. """
  9. import socket
  10. import select
  11. server = socket.socket()
  12. server.bind(('127.0.0.1',8080))
  13. server.listen(5)
  14. server.setblocking(False)
  15. read_list = [server]
  16. while True:
  17. r_list, w_list, x_list = select.select(read_list, [], []) #read_list = [server]
  18. """
  19. 帮你监管
  20. 一旦有人来了 立刻给你返回对应的监管对象
  21. """
  22. # print(res) # ([<socket.socket fd=3, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 8080)>], [], [])
  23. # print(server)
  24. # print(r_list)
  25. for i in r_list: #
  26. """针对不同的对象做不同的处理"""
  27. if i is server:
  28. conn, addr = i.accept()
  29. # 也应该添加到监管的队列中
  30. read_list.append(conn)
  31. else:
  32. res = i.recv(1024)
  33. if len(res) == 0:
  34. i.close()
  35. # 将无效的监管对象 移除
  36. read_list.remove(i)
  37. continue
  38. print(res)
  39. i.send(b'heiheiheiheihei')
  40. # 客户端
  41. import socket
  42. client = socket.socket()
  43. client.connect(('127.0.0.1',8080))
  44. while True:
  45. client.send(b'hello world')
  46. data = client.recv(1024)
  47. print(data)

监管机制:select 、poll、epoll

  1. """
  2. 监管机制其实有很多
  3. select机制 windows linux都有
  4. poll机制 只在linux有 poll和select都可以监管多个对象 但是poll监管的数量更多
  5. 上述select和poll机制其实都不是很完美 当监管的对象特别多的时候
  6. 可能会出现 极其大的延时响应
  7. epoll机制 只在linux有
  8. 它给每一个监管对象都绑定一个回调机制
  9. 一旦有响应 回调机制立刻发起提醒
  10. 针对不同的操作系统还需要考虑不同检测机制 书写代码太多繁琐
  11. 有一个人能够根据你跑的平台的不同自动帮你选择对应的监管机制
  12. selectors模块
  13. """

异步IO asyncio

  1. """
  2. 异步IO模型是所有模型中效率最高的 也是使用最广泛的
  3. 相关的模块和框架
  4. 模块:asyncio模块
  5. 异步框架:sanic tronado twisted
  6. 速度快!!!
  7. """
  8. import threading
  9. import asyncio
  10. @asyncio.coroutine
  11. def hello():
  12. print('hello world %s'%threading.current_thread())
  13. yield from asyncio.sleep(1) # 换成真正的IO操作
  14. print('hello world %s' % threading.current_thread())
  15. loop = asyncio.get_event_loop()
  16. tasks = [hello(),hello()]
  17. loop.run_until_complete(asyncio.wait(tasks))
  18. loop.close()

四个IO模型对比

参考博客园图解,稍微了解即可

网络并发知识点梳理

  • 软件开发架构

  • 互联网协议

    1. """
    2. osi七层
    3. 五层
    4. 每一层都是干嘛的
    5. 以太网协议 广播风暴
    6. IP协议
    7. TCP/UDP
    8. """
  • 三次握手四次挥手

  • socket简介

  • TCP黏包问题 定制固定长度的报头

  • UDP协议

  • socketserver模块

  • 操作系统发展史

  • 多道技术

  • 进程理论

  • 开启进程的两种方式

  • 互斥锁

  • 生产者消费者模型

  • 线程理论

  • 开启线程的两种方式

  • GIL全局解释器锁

  • 进程池线程池

  • 协程的概念

  • IO模型的了解