import rclpyfrom rclpy.node import Nodefrom sensor_msgs.msg import Imageimport cv_bridgeimport numpy as npimport cv2 as cvfrom geometry_msgs.msg import Twistimport serialimport structimport threadingimport time## User-defined parameters: (Update these values to your liking)# Minimum size for a contour to be considered anythingMIN_AREA = 500 # Minimum size for a contour to be considered part of the trackMIN_AREA_TRACK = 5000# Robot's speed when following the lineLINEAR_SPEED = 0.2# Proportional constant to be applied on speed when turning # (Multiplied by the error value)KP = 1.5/100 # If the line is completely lost, the error value shall be compensated by:LOSS_FACTOR = 1.2# Send messages every $TIMER_PERIOD secondsTIMER_PERIOD = 0.06# When about to end the track, move for ~$FINALIZATION_PERIOD more secondsFINALIZATION_PERIOD = 4# The maximum error value for which the robot is still in a straight lineMAX_ERROR = 30# BGR values to filter only the selected color rangelower_bgr_values = np.array([0, 0, 0])upper_bgr_values = np.array([180, 255, 46])finalization_countdown = Noneright_mark_count = 0class MyNode(Node): def __init__(self,node_name): super().__init__(node_name) # 线速度和角速度 self.x = 0.0 self.z = 0.0 # 初始化摄像头配置 self.init_camera() # 初始化串口 self.init_serial() def init_serial(self): # 创建串口对象 self.ser = serial.Serial("/dev/ttyS3",115200,timeout = 5) self.ser.flushInput() self.i = 0 # 需要发送的头 类型和数据长度 self.head = struct.pack('<b',0x7b) self.instruction = struct.pack('<b',0x00) self.length = struct.pack('<b',3) self.tail = struct.pack('<b',0x7a) self.t1 = threading.Thread(target=self.recv_speed) self.t2 = threading.Thread(target=self.send_speed) # 守护线程 主线程挂掉 子线程也挂掉 需要在start之前设置 self.t1.setDaemon(True) self.t1.setDaemon(True) self.t1.start() self.t2.start() # ---------------------------- 串口 ---------------------------- # # TODO 发送真实的操作指令 def send_speed(self): while True: print('send_speed') data = struct.pack('<f',self.x)+struct.pack('<f',0.0)+struct.pack('<f',self.z) # 校验和校验 check_num = 0x00+3+self.x+0.0+self.z check = struct.pack('<f',check_num) # 需要发送的数据 data = self.head+self.instruction+self.length+data+check+self.tail # 发送串口数据 self.ser.write(data) time.sleep(0.06) def recv_speed(self): print('start') print(f'is_open:{self.ser.is_open}') # while True: # print(f'i:{self.i}') # self.i += 1 # time.sleep(0.06) while True: self.i += 1 # 获取需要接收的数据个数 num = self.ser.inWaiting() # print(f'i:{self.i} start num:{num}') # num = self.ser.in_waiting() # if num<=0 or num != 48: # continue if num<=0 or num<48: continue if num>48: # 清除 self.ser.read(num) continue # 正常的一帧数据 print(f'num:{num}') data = self.ser.read(num) # 解析数据 self.parse_speed(data) print('end') time.sleep(0.06) # TODO 通过获取到的数据进行决策 def parse_speed(self,data): # 帧头 head = struct.unpack('<B',data[:1])[0] if head!=122: return # 命令 ins = struct.unpack('<B',data[1:2])[0] if ins!=1: return # 数据长度 length = struct.unpack('<B',data[2:3])[0] if length!=10: return # 数据 want_data = struct.unpack('<ffffffffff',data[3:43]) # 校验位 check = struct.unpack('<f',data[43:47])[0] # 帧尾 tail = struct.unpack('<B',data[47:])[0] # self.get_logger().info(f'i:{self.i},帧头:{head},命令:{ins},数据长度:{length},数据:{want_data},校验位:{check},帧尾:{tail}') print(f'i:{self.i},帧头:{head},命令:{ins},数据长度:{length},数据:{want_data},校验位:{check},帧尾:{tail}') print(f'i:{self.i} end') # ---------------------------- 摄像头 ---------------------------- # def init_camera(self): # 摄像头数据 self.image_input = None # 图像感兴趣区域 # self.crop_h_start = -1 # self.crop_h_stop = -1 self.crop_w_start = -1 # self.crop_w_stop = -1 # 订阅图像信息 self.subscription = self.create_subscription(Image, 'image_raw', self.image_callback, rclpy.qos.qos_profile_sensor_data) # cvbridge self.bridge = cv_bridge.CvBridge() # 设定时间回调 self.timer = self.create_timer(0.06, self.timer_callback) def image_callback(self,msg): """获取摄像头图像""" self.image_input = self.bridge.imgmsg_to_cv2(msg,desired_encoding='bgr8') # cv.imshow('img',self.image_input) # cv.waitKey(20) def crop_size(self,height, width): """ 裁剪感兴趣的区域 """ return (1*height//3, height, width//4, 3*width//4) def get_contour_data(self,mask, out): """ Return the centroid of the largest contour in the binary image 'mask' (the line) and return the side in which the smaller camera_namecontour is (the track mark) (If there are any of these contours), and draw all contours on 'out' image """ # 获取轮廓 contours, _ = cv.findContours(mask, cv.RETR_EXTERNAL, cv.CHAIN_APPROX_NONE) mark = {} line = {} for contour in contours: M = cv.moments(contour) # Search more about Image Moments on Wikipedia :) if M['m00'] > MIN_AREA: # if countor.area > MIN_AREA: if (M['m00'] > MIN_AREA_TRACK): # Contour is part of the track line['x'] = self.crop_w_start + int(M["m10"]/M["m00"]) line['y'] = int(M["m01"]/M["m00"]) # plot the area in light blue cv.drawContours(out, contour, -1, (255,255,0), 1) cv.putText(out, str(M['m00']), (int(M["m10"]/M["m00"]), int(M["m01"]/M["m00"])), cv.FONT_HERSHEY_PLAIN, 2, (255,255,0), 2) else: # Contour is a track mark if (not mark) or (mark['y'] > int(M["m01"]/M["m00"])): # if there are more than one mark, consider only # the one closest to the robot mark['y'] = int(M["m01"]/M["m00"]) mark['x'] = self.crop_w_start + int(M["m10"]/M["m00"]) # plot the area in pink cv.drawContours(out, contour, -1, (255,0,255), 1) cv.putText(out, str(M['m00']), (int(M["m10"]/M["m00"]), int(M["m01"]/M["m00"])), cv.FONT_HERSHEY_PLAIN, 2, (255,0,255), 2) if mark and line: # if both contours exist if mark['x'] > line['x']: mark_side = "right" else: mark_side = "left" else: mark_side = None return (line, mark_side) def timer_callback(self): """ Function to be called when the timer ticks. According to an image 'image_input', determine the speed of the robot so it can follow the contour """ global error # global image_input global just_seen_line global just_seen_right_mark global should_move global right_mark_count global finalization_countdown # Wait for the first image to be received if type(self.image_input) != np.ndarray: return # 获取宽度和高度 height, width, _ = self.image_input.shape image = self.image_input.copy() # 裁剪感兴趣的区域 crop_h_start, crop_h_stop, self.crop_w_start, crop_w_stop = self.crop_size(height, width) # 获取感兴趣的图片区域 crop = image[crop_h_start:crop_h_stop, self.crop_w_start:crop_w_stop] # 获取二值化的图 非0表示直线 # (过滤颜色值 只看到轮廓) mask = cv.inRange(crop, lower_bgr_values, upper_bgr_values) # get the centroid of the biggest contour in the picture, # and plot its detail on the cropped part of the output image #获取图片中最大轮廓的质心, #并在输出图像的裁剪部分绘制其细节 output = image line, mark_side = self.get_contour_data(mask, output[crop_h_start:crop_h_stop, self.crop_w_start:crop_w_stop]) # also get the side in which the track mark "is" message = Twist() if line: # if there even is a line in the image: # (as the camera could not be reading any lines) x = line['x'] # error:= The difference between the center of the image # and the center of the line error = x - width//2 message.linear.x = LINEAR_SPEED just_seen_line = True # plot the line centroid on the image cv.circle(output, (line['x'], crop_h_start + line['y']), 5, (0,255,0), 7) else: # There is no line in the image. # Turn on the spot to find it again. if just_seen_line: just_seen_line = False error = error * LOSS_FACTOR message.linear.x = 0.0 if mark_side != None: print("mark_side: {}".format(mark_side)) if (mark_side == "right") and (finalization_countdown == None) and \ (abs(error) <= MAX_ERROR) and (not just_seen_right_mark): right_mark_count += 1 if right_mark_count > 1: # Start final countdown to stop the robot finalization_countdown = int(FINALIZATION_PERIOD / TIMER_PERIOD) + 1 print("Finalization Process has begun!") just_seen_right_mark = True else: just_seen_right_mark = False # Determine the speed to turn and get the line in the center of the camera. message.angular.z = float(error) * -KP print("Error: {} | Angular Z: {}, ".format(error, message.angular.z)) # Plot the boundaries where the image was cropped cv.rectangle(output, (self.crop_w_start, crop_h_start), (crop_w_stop, crop_h_stop), (0,0,255), 2) # Uncomment to show the binary picture #cv2.imshow("mask", mask) # Show the output image to the user cv.imshow("output", output) # Print the image for 5milis, then resume execution cv.waitKey(5) # Check for final countdown if finalization_countdown != None: if finalization_countdown > 0: finalization_countdown -= 1 elif finalization_countdown == 0: should_move = False self.x = message.linear.x self.z = -message.angular.z # Publish the message to 'cmd_vel' # if should_move: # publisher.publish(message) # else: # empty_message = Twist() # publisher.publish(empty_message)def main(): # 初始化rclpy rclpy.init() # 参数:节点名称 node = MyNode('control_node') rclpy.spin(node) # 节点代码执行完之后需要销毁节点 node.destroy_node() # 关闭rclpy rclpy.shutdown()if __name__ == '__main__': main()