网络数据收发

功能需求:
- TCP客户端模式:
- 连接指定服务器
- 接受服务器的消息
- 给服务器发消息
- TCP服务器模式:
- 创建服务器
- 接受客户端消息
- 回复客户端消息
- UDP模式:
- 绑定本地IP和端口
- 向目标IP和端口发消息
- 清空消息记录
串口数据收发
依赖库:pyserial
源码:https://github.com/pyserial/pyserial
安装依赖:
pip install pyserial

功能需求:
- 扫描串口设备
- 进行串口连接配置
- 连接设备
- 接收串口消息
- 发送串口消息
使用**pyserial**封装的串口驱动:
import serialfrom serial import Serialfrom serial.tools import list_portsdef scan_serial_ports():ports = list_ports.comports()# for port in serial_ports: # ListPortInfo# print(port.device, port.description, port.hwid, port.vid, port.pid, port.serial_number, port.location)return [(port.device, port.description) for port in ports]class SerialDevice:def __init__(self, port, baud_rate=9600, timeout=None):self.port = portself.baud_rate = baud_rateself.timeout = timeoutself.serial: Serial = Nonedef open(self):try:self.serial = serial.Serial(self.port, self.baud_rate, timeout=self.timeout)if self.serial.is_open:# print(f"Serial port {self.port} opened successfully.")return True, f"Serial port {self.port} opened successfully."except serial.SerialException as e:print("请检查设备是否连接,或端口被其他软件占用,Failed to open serial port:", str(e))return False, str(e)return False, f"Serial port {self.port} open failed."def close(self):if self.serial and self.serial.is_open:self.serial.close()print("Serial port closed.")else:print("Serial port is not open.")def write(self, data):if not self.serial or not self.serial.is_open:print("Serial port is not open.")return Nonetry:self.serial.write(data)print("Data written:", data)except serial.SerialException as e:print("Failed to write data:", str(e))def flush(self):if not self.serial or not self.serial.is_open:print("Serial port is not open.")return Noneself.serial.flush()def read(self, num_bytes):if not self.serial or not self.serial.is_open:print("Serial port is not open.")return Nonetry:return self.serial.read(num_bytes)except serial.SerialException as e:print("Failed to read data:", str(e))return Nonedef readline(self):if not self.serial or not self.serial.is_open:print("Serial port is not open.")returntry:return self.serial.readline()except serial.SerialException as e:print("Failed to read data:", str(e))return Nonedef is_open(self):if not self.serial:return Falsereturn self.serial.is_openif __name__ == '__main__':# 示例用法serial_ports = scan_serial_ports()if len(serial_ports) > 0:print("Available serial serial_ports:")for device, description in serial_ports:print(device, "---", description)else:print("No serial serial_ports found.")# 示例用法# sp = SerialDevice("COM1", baud_rate=9600, timeout=1) # 替换为您的串口名称、波特率和超时时间# sp.open()# sp.write(b"Hello, Serial!") # 发送数据# print(sp.read(10)) # 读取10个字节的数据# sp.close()
蓝牙数据收发
依赖库:pybluez2==0.37
https://github.com/pybluez/pybluez
安装依赖:
pip install pybluez2==0.37

功能需求:
- 扫描蓝牙设备
- 连接蓝牙设备
- 接受蓝牙数据
- 发送蓝牙数据
利用**pybluez**封装的蓝牙驱动:
import bluetooth# 扫描所有设备class BluetoothDataTransfer:def __init__(self, target_address, target_name, port=1):self.target_address = target_addressself.target_name = target_nameself.port = portself.socket = Nonedef connect(self):"""连接蓝牙设备:return:"""try:self.socket = bluetooth.BluetoothSocket(bluetooth.RFCOMM)self.socket.connect((self.target_address, self.port))print("Connected successfully. {} ({})".format(self.target_name, self.target_address))return Trueexcept bluetooth.BluetoothError as e:self.socket = Noneprint("Connection failed:", str(e))return Falsedef disconnect(self):"""断开蓝牙连接:return:"""if self.socket is not None:self.socket.close()print("Disconnected.")def send_data(self, data):"""发送数据:param data::return:"""if self.socket is not None:try:self.socket.send(data)print("Data sent:", data)return Trueexcept bluetooth.BluetoothError as e:print("Failed to send data:", str(e))else:print("Not connected.")return Falsedef receive_data(self, buffer_size=1024):"""接收数据:param buffer_size::return:"""if self.socket is not None:try:data = self.socket.recv(buffer_size)print("Data received:", data.decode())return dataexcept bluetooth.BluetoothError as e:print("Failed to receive data:", str(e))else:print("Not connected.")@staticmethoddef scan_devices():"""扫描所有蓝牙设备:return:"""devices = bluetooth.discover_devices()print("Scanning devices...")device_list = []for addr in devices:name = bluetooth.lookup_name(addr)print("Found device:", name, "(", addr, ")")device_list.append((addr, name))return device_listif __name__ == '__main__':devices = BluetoothDataTransfer.scan_devices()for device in devices:print(device)# 示例用法bd = BluetoothDataTransfer("48:E7:DA:AA:98:28", "LIN", 1) # 替换为目标设备的蓝牙地址和端口号if bd.connect():bd.send_data("Hello, Bluetooth!") # 发送数据bd.receive_data() # 接收数据bd.disconnect()
