一 什么是粘包

发送端可以是一K一K地发送数据,而接收端的应用程序可以两K两K地提走数据,当然也有可能一次提走3K或6K数据,或者一次只提走几个字节的数据,也就是说,应用程序所看到的数据是一个整体,或说是一个流(stream),一条消息有多少字节对应用程序是不可见的,因此TCP协议是面向流的协议,这也是容易出现粘包问题的原因。而UDP是面向消息的协议,每个UDP段都是一条消息,应用程序必须以消息为单位提取数据,不能一次提取任意字节的数据,这一点和TCP是很不同的。怎样定义消息呢?可以认为对方一次性write/send的数据为一个消息,需要明白的是当对方send一条信息的时候,无论底层怎样分段分片,TCP协议层会把构成整条消息的数据段排序完成后才呈现在内核缓冲区。
例如基于tcp的套接字客户端往服务端上传文件,发送时文件内容是按照一段一段的字节流发送的,在接收方看了,根本不知道该文件的字节流从何处开始,在何处结束

所谓粘包问题主要还是因为接收方不知道消息之间的界限,不知道一次性提取多少字节的数据所造成的。
此外,发送方引起的粘包是由TCP协议本身造成的,TCP为提高传输效率,发送方往往要收集到足够多的数据后才发送一个TCP段。若连续几次需要send的数据都很少,通常TCP会根据优化算法把这些数据合成一个TCP段后一次发送出去,这样接收方就收到了粘包数据。

1.TCP(transport control protocol,传输控制协议)是面向连接的,面向流的,提供高可靠性服务。收发两端(客户端和服务器端)都要有一一成对的socket,因此,发送端为了将多个发往接收端的包,更有效的发到对方,使用了优化方法(Nagle算法),将多次间隔较小且数据量小的数据,合并成一个大的数据块,然后进行封包。这样,接收端,就难于分辨出来了,必须提供科学的拆包机制。 即面向流的通信是无消息保护边界的。
2.UDP(user datagram protocol,用户数据报协议)是无连接的,面向消息的,提供高效率服务。不会使用块的合并优化算法,, 由于UDP支持的是一对多的模式,所以接收端的skbuff(套接字缓冲区)采用了链式结构来记录每一个到达的UDP包,在每个UDP包中就有了消息头(消息来源地址,端口等信息),这样,对于接收端来说,就容易进行区分处理了。 即面向消息的通信是有消息保护边界的。
3.tcp是基于数据流的,于是收发的消息不能为空,这就需要在客户端和服务端都添加空消息的处理机制,防止程序卡住,而udp是基于数据报的,即便是你输入的是空内容(直接回车),那也不是空消息,udp协议会帮你封装上消息头,实验略
udp的recvfrom是阻塞的,一个recvfrom(x)必须对唯一一个sendinto(y),收完了x个字节的数据就算完成,若是y>x数据就丢失,这意味着udp根本不会粘包,但是会丢数据,不可靠
tcp的协议数据不会丢,没有收完包,下次接收,会继续上次继续接收,己端总是在收到ack时才会清除缓冲区内容。数据是可靠的,但是会粘包。

二 粘包解决方式

为字节流加上自定义固定长度报头,报头中包含字节流长度,然后一次send到对端,对端在接收时,先从缓存中取出定长的报头,然后再取真实数据

struct模块
该模块可以把一个类型,如数字,转成固定长度的bytes
>>> struct.pack(‘i’,1111111111111)
。。。。。。。。。
struct.error: ‘i’ format requires -2147483648 <= number <= 2147483647 #这个是范围

3.粘包问题 - 图1

  1. import json,struct
  2. #假设通过客户端上传1T:1073741824000的文件a.txt
  3. #为避免粘包,必须自定制报头
  4. header={'file_size':1073741824000,'file_name':'/a/b/c/d/e/a.txt','md5':'8f6fbf8347faa4924a76856701edb0f3'} #1T数据,文件路径和md5值
  5. #为了该报头能传送,需要序列化并且转为bytes
  6. head_bytes=bytes(json.dumps(header),encoding='utf-8') #序列化并转成bytes,用于传输
  7. #为了让客户端知道报头的长度,用struck将报头长度这个数字转成固定长度:4个字节
  8. head_len_bytes=struct.pack('i',len(head_bytes)) #这4个字节里只包含了一个数字,该数字是报头的长度
  9. #客户端开始发送
  10. conn.send(head_len_bytes) #先发报头的长度,4个bytes
  11. conn.send(head_bytes) #再发报头的字节格式
  12. conn.sendall(文件内容) #然后发真实内容的字节格式
  13. #服务端开始接收
  14. head_len_bytes=s.recv(4) #先收报头4个bytes,得到报头长度的字节格式
  15. x=struct.unpack('i',head_len_bytes)[0] #提取报头的长度
  16. head_bytes=s.recv(x) #按照报头长度x,收取报头的bytes格式
  17. header=json.loads(json.dumps(header)) #提取报头
  18. #最后根据报头的内容提取真实的数据,比如
  19. real_data_len=s.recv(header['file_size'])
  20. s.recv(real_data_len)

struct的详细用法

  1. #_*_coding:utf-8_*_
  2. import struct
  3. import binascii
  4. import ctypes
  5. values1 = (1, 'abc'.encode('utf-8'), 2.7)
  6. values2 = ('defg'.encode('utf-8'),101)
  7. s1 = struct.Struct('I3sf')
  8. s2 = struct.Struct('4sI')
  9. print(s1.size,s2.size)
  10. prebuffer=ctypes.create_string_buffer(s1.size+s2.size)
  11. print('Before : ',binascii.hexlify(prebuffer))
  12. # t=binascii.hexlify('asdfaf'.encode('utf-8'))
  13. # print(t)
  14. s1.pack_into(prebuffer,0,*values1)
  15. s2.pack_into(prebuffer,s1.size,*values2)
  16. print('After pack',binascii.hexlify(prebuffer))
  17. print(s1.unpack_from(prebuffer,0))
  18. print(s2.unpack_from(prebuffer,s1.size))
  19. s3=struct.Struct('ii')
  20. s3.pack_into(prebuffer,0,123,123)
  21. print('After pack',binascii.hexlify(prebuffer))
  22. print(s3.unpack_from(prebuffer,0))

服务端自定义报头

  1. import socket,struct,json
  2. import subprocess
  3. phone=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
  4. phone.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1) #就是它,在bind前加
  5. phone.bind(('127.0.0.1',8080))
  6. phone.listen(5)
  7. while True:
  8. conn,addr=phone.accept()
  9. while True:
  10. cmd=conn.recv(1024)
  11. if not cmd:break
  12. print('cmd: %s' %cmd)
  13. res=subprocess.Popen(cmd.decode('utf-8'),
  14. shell=True,
  15. stdout=subprocess.PIPE,
  16. stderr=subprocess.PIPE)
  17. err=res.stderr.read()
  18. print(err)
  19. if err:
  20. back_msg=err
  21. else:
  22. back_msg=res.stdout.read()
  23. headers={'data_size':len(back_msg)}
  24. head_json=json.dumps(headers)
  25. head_json_bytes=bytes(head_json,encoding='utf-8')
  26. conn.send(struct.pack('i',len(head_json_bytes))) #先发报头的长度
  27. conn.send(head_json_bytes) #再发报头
  28. conn.sendall(back_msg) #在发真实的内容
  29. conn.close()

客户端自定义报头

  1. #_*_coding:utf-8_*_
  2. import socket,time,struct
  3. s=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
  4. res=s.connect_ex(('127.0.0.1',8080))
  5. while True:
  6. msg=input('>>: ').strip()
  7. if len(msg) == 0:continue
  8. if msg == 'quit':break
  9. s.send(msg.encode('utf-8'))
  10. l=s.recv(4)
  11. x=struct.unpack('i',l)[0]
  12. print(type(x),x)
  13. # print(struct.unpack('I',l))
  14. r_s=0
  15. data=b''
  16. while r_s < x:
  17. r_d=s.recv(1024)
  18. data+=r_d
  19. r_s+=len(r_d)
  20. # print(data.decode('utf-8'))
  21. print(data.decode('gbk')) #windows默认gbk编码

我们可以把报头做成字典,字典里包含将要发送的真实数据的详细信息,然后json序列化,然后用struck将序列化后的数据长度打包成4个字节(4个自己足够用了)
发送时:
先发报头长度
再编码报头内容然后发送
最后发真实内容

接收时:
先手报头长度,用struct取出来
根据取出的长度收取报头内容,然后解码,反序列化
从反序列化的结果中取出待取数据的详细信息,然后去取真实的数据内容

三 ftp文件上传下载练习

ftp服务端

  1. import socket
  2. import struct
  3. import json
  4. import subprocess
  5. import os
  6. class MYTCPServer:
  7. address_family = socket.AF_INET
  8. socket_type = socket.SOCK_STREAM
  9. allow_reuse_address = False
  10. max_packet_size = 8192
  11. coding='utf-8'
  12. request_queue_size = 5
  13. server_dir='file_upload'
  14. def __init__(self, server_address, bind_and_activate=True):
  15. """Constructor. May be extended, do not override."""
  16. self.server_address=server_address
  17. self.socket = socket.socket(self.address_family,
  18. self.socket_type)
  19. if bind_and_activate:
  20. try:
  21. self.server_bind()
  22. self.server_activate()
  23. except:
  24. self.server_close()
  25. raise
  26. def server_bind(self):
  27. """Called by constructor to bind the socket.
  28. """
  29. if self.allow_reuse_address:
  30. self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
  31. self.socket.bind(self.server_address)
  32. self.server_address = self.socket.getsockname()
  33. def server_activate(self):
  34. """Called by constructor to activate the server.
  35. """
  36. self.socket.listen(self.request_queue_size)
  37. def server_close(self):
  38. """Called to clean-up the server.
  39. """
  40. self.socket.close()
  41. def get_request(self):
  42. """Get the request and client address from the socket.
  43. """
  44. return self.socket.accept()
  45. def close_request(self, request):
  46. """Called to clean up an individual request."""
  47. request.close()
  48. def run(self):
  49. while True:
  50. self.conn,self.client_addr=self.get_request()
  51. print('from client ',self.client_addr)
  52. while True:
  53. try:
  54. head_struct = self.conn.recv(4)
  55. if not head_struct:break
  56. head_len = struct.unpack('i', head_struct)[0]
  57. head_json = self.conn.recv(head_len).decode(self.coding)
  58. head_dic = json.loads(head_json)
  59. print(head_dic)
  60. #head_dic={'cmd':'put','filename':'a.txt','filesize':123123}
  61. cmd=head_dic['cmd']
  62. if hasattr(self,cmd):
  63. func=getattr(self,cmd)
  64. func(head_dic)
  65. except Exception:
  66. break
  67. def put(self,args):
  68. file_path=os.path.normpath(os.path.join(
  69. self.server_dir,
  70. args['filename']
  71. ))
  72. filesize=args['filesize']
  73. recv_size=0
  74. print('----->',file_path)
  75. with open(file_path,'wb') as f:
  76. while recv_size < filesize:
  77. recv_data=self.conn.recv(self.max_packet_size)
  78. f.write(recv_data)
  79. recv_size+=len(recv_data)
  80. print('recvsize:%s filesize:%s' %(recv_size,filesize))
  81. tcpserver1=MYTCPServer(('127.0.0.1',8080))
  82. tcpserver1.run()

ftp客户端

  1. import socket
  2. import struct
  3. import json
  4. import os
  5. class MYTCPClient:
  6. address_family = socket.AF_INET
  7. socket_type = socket.SOCK_STREAM
  8. allow_reuse_address = False
  9. max_packet_size = 8192
  10. coding='utf-8'
  11. request_queue_size = 5
  12. def __init__(self, server_address, connect=True):
  13. self.server_address=server_address
  14. self.socket = socket.socket(self.address_family,
  15. self.socket_type)
  16. if connect:
  17. try:
  18. self.client_connect()
  19. except:
  20. self.client_close()
  21. raise
  22. def client_connect(self):
  23. self.socket.connect(self.server_address)
  24. def client_close(self):
  25. self.socket.close()
  26. def run(self):
  27. while True:
  28. inp=input(">>: ").strip()
  29. if not inp:continue
  30. l=inp.split()
  31. cmd=l[0]
  32. if hasattr(self,cmd):
  33. func=getattr(self,cmd)
  34. func(l)
  35. def put(self,args):
  36. cmd=args[0]
  37. filename=args[1]
  38. if not os.path.isfile(filename):
  39. print('file:%s is not exists' %filename)
  40. return
  41. else:
  42. filesize=os.path.getsize(filename)
  43. head_dic={'cmd':cmd,'filename':os.path.basename(filename),'filesize':filesize}
  44. print(head_dic)
  45. head_json=json.dumps(head_dic)
  46. head_json_bytes=bytes(head_json,encoding=self.coding)
  47. head_struct=struct.pack('i',len(head_json_bytes))
  48. self.socket.send(head_struct)
  49. self.socket.send(head_json_bytes)
  50. send_size=0
  51. with open(filename,'rb') as f:
  52. for line in f:
  53. self.socket.send(line)
  54. send_size+=len(line)
  55. print(send_size)
  56. else:
  57. print('upload successful')
  58. client=MYTCPClient(('127.0.0.1',8080))
  59. client.run()