网络数据收发

image.png
功能需求:

  • TCP客户端模式:
    • 连接指定服务器
    • 接受服务器的消息
    • 给服务器发消息
  • TCP服务器模式:
    • 创建服务器
    • 接受客户端消息
    • 回复客户端消息
  • UDP模式:
    • 绑定本地IP和端口
    • 向目标IP和端口发消息
  • 清空消息记录

串口数据收发

依赖库:pyserial
源码:https://github.com/pyserial/pyserial
安装依赖:

  1. pip install pyserial

image.png
功能需求:

  • 扫描串口设备
  • 进行串口连接配置
  • 连接设备
  • 接收串口消息
  • 发送串口消息

使用**pyserial**封装的串口驱动:

  1. import serial
  2. from serial import Serial
  3. from serial.tools import list_ports
  4. def scan_serial_ports():
  5. ports = list_ports.comports()
  6. # for port in serial_ports: # ListPortInfo
  7. # print(port.device, port.description, port.hwid, port.vid, port.pid, port.serial_number, port.location)
  8. return [(port.device, port.description) for port in ports]
  9. class SerialDevice:
  10. def __init__(self, port, baud_rate=9600, timeout=None):
  11. self.port = port
  12. self.baud_rate = baud_rate
  13. self.timeout = timeout
  14. self.serial: Serial = None
  15. def open(self):
  16. try:
  17. self.serial = serial.Serial(self.port, self.baud_rate, timeout=self.timeout)
  18. if self.serial.is_open:
  19. # print(f"Serial port {self.port} opened successfully.")
  20. return True, f"Serial port {self.port} opened successfully."
  21. except serial.SerialException as e:
  22. print("请检查设备是否连接,或端口被其他软件占用,Failed to open serial port:", str(e))
  23. return False, str(e)
  24. return False, f"Serial port {self.port} open failed."
  25. def close(self):
  26. if self.serial and self.serial.is_open:
  27. self.serial.close()
  28. print("Serial port closed.")
  29. else:
  30. print("Serial port is not open.")
  31. def write(self, data):
  32. if not self.serial or not self.serial.is_open:
  33. print("Serial port is not open.")
  34. return None
  35. try:
  36. self.serial.write(data)
  37. print("Data written:", data)
  38. except serial.SerialException as e:
  39. print("Failed to write data:", str(e))
  40. def flush(self):
  41. if not self.serial or not self.serial.is_open:
  42. print("Serial port is not open.")
  43. return None
  44. self.serial.flush()
  45. def read(self, num_bytes):
  46. if not self.serial or not self.serial.is_open:
  47. print("Serial port is not open.")
  48. return None
  49. try:
  50. return self.serial.read(num_bytes)
  51. except serial.SerialException as e:
  52. print("Failed to read data:", str(e))
  53. return None
  54. def readline(self):
  55. if not self.serial or not self.serial.is_open:
  56. print("Serial port is not open.")
  57. return
  58. try:
  59. return self.serial.readline()
  60. except serial.SerialException as e:
  61. print("Failed to read data:", str(e))
  62. return None
  63. def is_open(self):
  64. if not self.serial:
  65. return False
  66. return self.serial.is_open
  67. if __name__ == '__main__':
  68. # 示例用法
  69. serial_ports = scan_serial_ports()
  70. if len(serial_ports) > 0:
  71. print("Available serial serial_ports:")
  72. for device, description in serial_ports:
  73. print(device, "---", description)
  74. else:
  75. print("No serial serial_ports found.")
  76. # 示例用法
  77. # sp = SerialDevice("COM1", baud_rate=9600, timeout=1) # 替换为您的串口名称、波特率和超时时间
  78. # sp.open()
  79. # sp.write(b"Hello, Serial!") # 发送数据
  80. # print(sp.read(10)) # 读取10个字节的数据
  81. # sp.close()

蓝牙数据收发

依赖库:pybluez2==0.37
https://github.com/pybluez/pybluez
安装依赖:

  1. pip install pybluez2==0.37

image.png
功能需求:

  • 扫描蓝牙设备
  • 连接蓝牙设备
  • 接受蓝牙数据
  • 发送蓝牙数据

利用**pybluez**封装的蓝牙驱动:

  1. import bluetooth
  2. # 扫描所有设备
  3. class BluetoothDataTransfer:
  4. def __init__(self, target_address, target_name, port=1):
  5. self.target_address = target_address
  6. self.target_name = target_name
  7. self.port = port
  8. self.socket = None
  9. def connect(self):
  10. """
  11. 连接蓝牙设备
  12. :return:
  13. """
  14. try:
  15. self.socket = bluetooth.BluetoothSocket(bluetooth.RFCOMM)
  16. self.socket.connect((self.target_address, self.port))
  17. print("Connected successfully. {} ({})".format(
  18. self.target_name, self.target_address
  19. ))
  20. return True
  21. except bluetooth.BluetoothError as e:
  22. self.socket = None
  23. print("Connection failed:", str(e))
  24. return False
  25. def disconnect(self):
  26. """
  27. 断开蓝牙连接
  28. :return:
  29. """
  30. if self.socket is not None:
  31. self.socket.close()
  32. print("Disconnected.")
  33. def send_data(self, data):
  34. """
  35. 发送数据
  36. :param data:
  37. :return:
  38. """
  39. if self.socket is not None:
  40. try:
  41. self.socket.send(data)
  42. print("Data sent:", data)
  43. return True
  44. except bluetooth.BluetoothError as e:
  45. print("Failed to send data:", str(e))
  46. else:
  47. print("Not connected.")
  48. return False
  49. def receive_data(self, buffer_size=1024):
  50. """
  51. 接收数据
  52. :param buffer_size:
  53. :return:
  54. """
  55. if self.socket is not None:
  56. try:
  57. data = self.socket.recv(buffer_size)
  58. print("Data received:", data.decode())
  59. return data
  60. except bluetooth.BluetoothError as e:
  61. print("Failed to receive data:", str(e))
  62. else:
  63. print("Not connected.")
  64. @staticmethod
  65. def scan_devices():
  66. """
  67. 扫描所有蓝牙设备
  68. :return:
  69. """
  70. devices = bluetooth.discover_devices()
  71. print("Scanning devices...")
  72. device_list = []
  73. for addr in devices:
  74. name = bluetooth.lookup_name(addr)
  75. print("Found device:", name, "(", addr, ")")
  76. device_list.append((addr, name))
  77. return device_list
  78. if __name__ == '__main__':
  79. devices = BluetoothDataTransfer.scan_devices()
  80. for device in devices:
  81. print(device)
  82. # 示例用法
  83. bd = BluetoothDataTransfer("48:E7:DA:AA:98:28", "LIN", 1) # 替换为目标设备的蓝牙地址和端口号
  84. if bd.connect():
  85. bd.send_data("Hello, Bluetooth!") # 发送数据
  86. bd.receive_data() # 接收数据
  87. bd.disconnect()