网络数据收发
功能需求:
- TCP客户端模式:
- 连接指定服务器
- 接受服务器的消息
- 给服务器发消息
- TCP服务器模式:
- 创建服务器
- 接受客户端消息
- 回复客户端消息
- UDP模式:
- 绑定本地IP和端口
- 向目标IP和端口发消息
- 清空消息记录
串口数据收发
依赖库:pyserial
源码:https://github.com/pyserial/pyserial
安装依赖:
pip install pyserial
功能需求:
- 扫描串口设备
- 进行串口连接配置
- 连接设备
- 接收串口消息
- 发送串口消息
使用**pyserial**
封装的串口驱动:
import serial
from serial import Serial
from serial.tools import list_ports
def scan_serial_ports():
ports = list_ports.comports()
# for port in serial_ports: # ListPortInfo
# print(port.device, port.description, port.hwid, port.vid, port.pid, port.serial_number, port.location)
return [(port.device, port.description) for port in ports]
class SerialDevice:
def __init__(self, port, baud_rate=9600, timeout=None):
self.port = port
self.baud_rate = baud_rate
self.timeout = timeout
self.serial: Serial = None
def open(self):
try:
self.serial = serial.Serial(self.port, self.baud_rate, timeout=self.timeout)
if self.serial.is_open:
# print(f"Serial port {self.port} opened successfully.")
return True, f"Serial port {self.port} opened successfully."
except serial.SerialException as e:
print("请检查设备是否连接,或端口被其他软件占用,Failed to open serial port:", str(e))
return False, str(e)
return False, f"Serial port {self.port} open failed."
def close(self):
if self.serial and self.serial.is_open:
self.serial.close()
print("Serial port closed.")
else:
print("Serial port is not open.")
def write(self, data):
if not self.serial or not self.serial.is_open:
print("Serial port is not open.")
return None
try:
self.serial.write(data)
print("Data written:", data)
except serial.SerialException as e:
print("Failed to write data:", str(e))
def flush(self):
if not self.serial or not self.serial.is_open:
print("Serial port is not open.")
return None
self.serial.flush()
def read(self, num_bytes):
if not self.serial or not self.serial.is_open:
print("Serial port is not open.")
return None
try:
return self.serial.read(num_bytes)
except serial.SerialException as e:
print("Failed to read data:", str(e))
return None
def readline(self):
if not self.serial or not self.serial.is_open:
print("Serial port is not open.")
return
try:
return self.serial.readline()
except serial.SerialException as e:
print("Failed to read data:", str(e))
return None
def is_open(self):
if not self.serial:
return False
return self.serial.is_open
if __name__ == '__main__':
# 示例用法
serial_ports = scan_serial_ports()
if len(serial_ports) > 0:
print("Available serial serial_ports:")
for device, description in serial_ports:
print(device, "---", description)
else:
print("No serial serial_ports found.")
# 示例用法
# sp = SerialDevice("COM1", baud_rate=9600, timeout=1) # 替换为您的串口名称、波特率和超时时间
# sp.open()
# sp.write(b"Hello, Serial!") # 发送数据
# print(sp.read(10)) # 读取10个字节的数据
# sp.close()
蓝牙数据收发
依赖库:pybluez2==0.37
https://github.com/pybluez/pybluez
安装依赖:
pip install pybluez2==0.37
功能需求:
- 扫描蓝牙设备
- 连接蓝牙设备
- 接受蓝牙数据
- 发送蓝牙数据
利用**pybluez**
封装的蓝牙驱动:
import bluetooth
# 扫描所有设备
class BluetoothDataTransfer:
def __init__(self, target_address, target_name, port=1):
self.target_address = target_address
self.target_name = target_name
self.port = port
self.socket = None
def connect(self):
"""
连接蓝牙设备
:return:
"""
try:
self.socket = bluetooth.BluetoothSocket(bluetooth.RFCOMM)
self.socket.connect((self.target_address, self.port))
print("Connected successfully. {} ({})".format(
self.target_name, self.target_address
))
return True
except bluetooth.BluetoothError as e:
self.socket = None
print("Connection failed:", str(e))
return False
def disconnect(self):
"""
断开蓝牙连接
:return:
"""
if self.socket is not None:
self.socket.close()
print("Disconnected.")
def send_data(self, data):
"""
发送数据
:param data:
:return:
"""
if self.socket is not None:
try:
self.socket.send(data)
print("Data sent:", data)
return True
except bluetooth.BluetoothError as e:
print("Failed to send data:", str(e))
else:
print("Not connected.")
return False
def receive_data(self, buffer_size=1024):
"""
接收数据
:param buffer_size:
:return:
"""
if self.socket is not None:
try:
data = self.socket.recv(buffer_size)
print("Data received:", data.decode())
return data
except bluetooth.BluetoothError as e:
print("Failed to receive data:", str(e))
else:
print("Not connected.")
@staticmethod
def scan_devices():
"""
扫描所有蓝牙设备
:return:
"""
devices = bluetooth.discover_devices()
print("Scanning devices...")
device_list = []
for addr in devices:
name = bluetooth.lookup_name(addr)
print("Found device:", name, "(", addr, ")")
device_list.append((addr, name))
return device_list
if __name__ == '__main__':
devices = BluetoothDataTransfer.scan_devices()
for device in devices:
print(device)
# 示例用法
bd = BluetoothDataTransfer("48:E7:DA:AA:98:28", "LIN", 1) # 替换为目标设备的蓝牙地址和端口号
if bd.connect():
bd.send_data("Hello, Bluetooth!") # 发送数据
bd.receive_data() # 接收数据
bd.disconnect()