import rclpy
from rclpy.node import Node
from sensor_msgs.msg import Image
import cv_bridge
import numpy as np
import cv2 as cv
from geometry_msgs.msg import Twist
import serial
import struct
import threading
import time
## User-defined parameters: (Update these values to your liking)
# Minimum size for a contour to be considered anything
MIN_AREA = 500
# Minimum size for a contour to be considered part of the track
MIN_AREA_TRACK = 5000
# Robot's speed when following the line
LINEAR_SPEED = 0.2
# Proportional constant to be applied on speed when turning
# (Multiplied by the error value)
KP = 1.5/100
# If the line is completely lost, the error value shall be compensated by:
LOSS_FACTOR = 1.2
# Send messages every $TIMER_PERIOD seconds
TIMER_PERIOD = 0.06
# When about to end the track, move for ~$FINALIZATION_PERIOD more seconds
FINALIZATION_PERIOD = 4
# The maximum error value for which the robot is still in a straight line
MAX_ERROR = 30
# BGR values to filter only the selected color range
lower_bgr_values = np.array([0, 0, 0])
upper_bgr_values = np.array([180, 255, 46])
finalization_countdown = None
right_mark_count = 0
class MyNode(Node):
def __init__(self,node_name):
super().__init__(node_name)
# 线速度和角速度
self.x = 0.0
self.z = 0.0
# 初始化摄像头配置
self.init_camera()
# 初始化串口
self.init_serial()
def init_serial(self):
# 创建串口对象
self.ser = serial.Serial("/dev/ttyS3",115200,timeout = 5)
self.ser.flushInput()
self.i = 0
# 需要发送的头 类型和数据长度
self.head = struct.pack('<b',0x7b)
self.instruction = struct.pack('<b',0x00)
self.length = struct.pack('<b',3)
self.tail = struct.pack('<b',0x7a)
self.t1 = threading.Thread(target=self.recv_speed)
self.t2 = threading.Thread(target=self.send_speed)
# 守护线程 主线程挂掉 子线程也挂掉 需要在start之前设置
self.t1.setDaemon(True)
self.t1.setDaemon(True)
self.t1.start()
self.t2.start()
# ---------------------------- 串口 ---------------------------- #
# TODO 发送真实的操作指令
def send_speed(self):
while True:
print('send_speed')
data = struct.pack('<f',self.x)+struct.pack('<f',0.0)+struct.pack('<f',self.z)
# 校验和校验
check_num = 0x00+3+self.x+0.0+self.z
check = struct.pack('<f',check_num)
# 需要发送的数据
data = self.head+self.instruction+self.length+data+check+self.tail
# 发送串口数据
self.ser.write(data)
time.sleep(0.06)
def recv_speed(self):
print('start')
print(f'is_open:{self.ser.is_open}')
# while True:
# print(f'i:{self.i}')
# self.i += 1
# time.sleep(0.06)
while True:
self.i += 1
# 获取需要接收的数据个数
num = self.ser.inWaiting()
# print(f'i:{self.i} start num:{num}')
# num = self.ser.in_waiting()
# if num<=0 or num != 48:
# continue
if num<=0 or num<48:
continue
if num>48:
# 清除
self.ser.read(num)
continue
# 正常的一帧数据
print(f'num:{num}')
data = self.ser.read(num)
# 解析数据
self.parse_speed(data)
print('end')
time.sleep(0.06)
# TODO 通过获取到的数据进行决策
def parse_speed(self,data):
# 帧头
head = struct.unpack('<B',data[:1])[0]
if head!=122:
return
# 命令
ins = struct.unpack('<B',data[1:2])[0]
if ins!=1:
return
# 数据长度
length = struct.unpack('<B',data[2:3])[0]
if length!=10:
return
# 数据
want_data = struct.unpack('<ffffffffff',data[3:43])
# 校验位
check = struct.unpack('<f',data[43:47])[0]
# 帧尾
tail = struct.unpack('<B',data[47:])[0]
# self.get_logger().info(f'i:{self.i},帧头:{head},命令:{ins},数据长度:{length},数据:{want_data},校验位:{check},帧尾:{tail}')
print(f'i:{self.i},帧头:{head},命令:{ins},数据长度:{length},数据:{want_data},校验位:{check},帧尾:{tail}')
print(f'i:{self.i} end')
# ---------------------------- 摄像头 ---------------------------- #
def init_camera(self):
# 摄像头数据
self.image_input = None
# 图像感兴趣区域
# self.crop_h_start = -1
# self.crop_h_stop = -1
self.crop_w_start = -1
# self.crop_w_stop = -1
# 订阅图像信息
self.subscription = self.create_subscription(Image, 'image_raw',
self.image_callback,
rclpy.qos.qos_profile_sensor_data)
# cvbridge
self.bridge = cv_bridge.CvBridge()
# 设定时间回调
self.timer = self.create_timer(0.06, self.timer_callback)
def image_callback(self,msg):
"""获取摄像头图像"""
self.image_input = self.bridge.imgmsg_to_cv2(msg,desired_encoding='bgr8')
# cv.imshow('img',self.image_input)
# cv.waitKey(20)
def crop_size(self,height, width):
"""
裁剪感兴趣的区域
"""
return (1*height//3, height, width//4, 3*width//4)
def get_contour_data(self,mask, out):
"""
Return the centroid of the largest contour in
the binary image 'mask' (the line)
and return the side in which the smaller camera_namecontour is (the track mark)
(If there are any of these contours),
and draw all contours on 'out' image
"""
# 获取轮廓
contours, _ = cv.findContours(mask, cv.RETR_EXTERNAL, cv.CHAIN_APPROX_NONE)
mark = {}
line = {}
for contour in contours:
M = cv.moments(contour)
# Search more about Image Moments on Wikipedia :)
if M['m00'] > MIN_AREA:
# if countor.area > MIN_AREA:
if (M['m00'] > MIN_AREA_TRACK):
# Contour is part of the track
line['x'] = self.crop_w_start + int(M["m10"]/M["m00"])
line['y'] = int(M["m01"]/M["m00"])
# plot the area in light blue
cv.drawContours(out, contour, -1, (255,255,0), 1)
cv.putText(out, str(M['m00']), (int(M["m10"]/M["m00"]), int(M["m01"]/M["m00"])),
cv.FONT_HERSHEY_PLAIN, 2, (255,255,0), 2)
else:
# Contour is a track mark
if (not mark) or (mark['y'] > int(M["m01"]/M["m00"])):
# if there are more than one mark, consider only
# the one closest to the robot
mark['y'] = int(M["m01"]/M["m00"])
mark['x'] = self.crop_w_start + int(M["m10"]/M["m00"])
# plot the area in pink
cv.drawContours(out, contour, -1, (255,0,255), 1)
cv.putText(out, str(M['m00']), (int(M["m10"]/M["m00"]), int(M["m01"]/M["m00"])),
cv.FONT_HERSHEY_PLAIN, 2, (255,0,255), 2)
if mark and line:
# if both contours exist
if mark['x'] > line['x']:
mark_side = "right"
else:
mark_side = "left"
else:
mark_side = None
return (line, mark_side)
def timer_callback(self):
"""
Function to be called when the timer ticks.
According to an image 'image_input', determine the speed of the robot
so it can follow the contour
"""
global error
# global image_input
global just_seen_line
global just_seen_right_mark
global should_move
global right_mark_count
global finalization_countdown
# Wait for the first image to be received
if type(self.image_input) != np.ndarray:
return
# 获取宽度和高度
height, width, _ = self.image_input.shape
image = self.image_input.copy()
# 裁剪感兴趣的区域
crop_h_start, crop_h_stop, self.crop_w_start, crop_w_stop = self.crop_size(height, width)
# 获取感兴趣的图片区域
crop = image[crop_h_start:crop_h_stop, self.crop_w_start:crop_w_stop]
# 获取二值化的图 非0表示直线
# (过滤颜色值 只看到轮廓)
mask = cv.inRange(crop, lower_bgr_values, upper_bgr_values)
# get the centroid of the biggest contour in the picture,
# and plot its detail on the cropped part of the output image
#获取图片中最大轮廓的质心,
#并在输出图像的裁剪部分绘制其细节
output = image
line, mark_side = self.get_contour_data(mask, output[crop_h_start:crop_h_stop, self.crop_w_start:crop_w_stop])
# also get the side in which the track mark "is"
message = Twist()
if line:
# if there even is a line in the image:
# (as the camera could not be reading any lines)
x = line['x']
# error:= The difference between the center of the image
# and the center of the line
error = x - width//2
message.linear.x = LINEAR_SPEED
just_seen_line = True
# plot the line centroid on the image
cv.circle(output, (line['x'], crop_h_start + line['y']), 5, (0,255,0), 7)
else:
# There is no line in the image.
# Turn on the spot to find it again.
if just_seen_line:
just_seen_line = False
error = error * LOSS_FACTOR
message.linear.x = 0.0
if mark_side != None:
print("mark_side: {}".format(mark_side))
if (mark_side == "right") and (finalization_countdown == None) and \
(abs(error) <= MAX_ERROR) and (not just_seen_right_mark):
right_mark_count += 1
if right_mark_count > 1:
# Start final countdown to stop the robot
finalization_countdown = int(FINALIZATION_PERIOD / TIMER_PERIOD) + 1
print("Finalization Process has begun!")
just_seen_right_mark = True
else:
just_seen_right_mark = False
# Determine the speed to turn and get the line in the center of the camera.
message.angular.z = float(error) * -KP
print("Error: {} | Angular Z: {}, ".format(error, message.angular.z))
# Plot the boundaries where the image was cropped
cv.rectangle(output, (self.crop_w_start, crop_h_start), (crop_w_stop, crop_h_stop), (0,0,255), 2)
# Uncomment to show the binary picture
#cv2.imshow("mask", mask)
# Show the output image to the user
cv.imshow("output", output)
# Print the image for 5milis, then resume execution
cv.waitKey(5)
# Check for final countdown
if finalization_countdown != None:
if finalization_countdown > 0:
finalization_countdown -= 1
elif finalization_countdown == 0:
should_move = False
self.x = message.linear.x
self.z = -message.angular.z
# Publish the message to 'cmd_vel'
# if should_move:
# publisher.publish(message)
# else:
# empty_message = Twist()
# publisher.publish(empty_message)
def main():
# 初始化rclpy
rclpy.init()
# 参数:节点名称
node = MyNode('control_node')
rclpy.spin(node)
# 节点代码执行完之后需要销毁节点
node.destroy_node()
# 关闭rclpy
rclpy.shutdown()
if __name__ == '__main__':
main()