环境搭建

安装rk356x提供好的docker镜像,在该镜像中已经安装好了所需要的环境, 我们使用这个环境主要是将模型转换成rknn格式
参考链接:
https://github.com/airockchip/yolov5.git
https://github.com/rockchip-linux/rknpu2
https://github.com/rockchip-linux/rknn-toolkit2

加载docker

  1. 进到环境目录

    1. rknn-toolkit2-1.x.x/docker/docker_full
  2. 加载镜像文件

    1. docker load --input rknn-toolkit2-1.x.x-cp36-docker.tar.gz
  3. 查看是否加载成功

    1. docker images
  4. 创建容器

    1. docker run -it --shm-size 4G -v D:\DATA\ml\rknn-toolkit2-1.5.0\examples\onnx\yolov5:/rknn_yolov5_demo rknn-toolkit2:1.5.0-ml /bin/bash

    其中”-v 主机目录:虚拟机目录” 是将本机目录映射到docker中的目录, 成功进到docker目录之后, 我们就可以来生成模型了

  5. 以后每次启动的时候,启动对应的容器

    1. docker container ls
    2. docker start -ai <container_name>

    可以使用如下命令查看内存使用

    1. df -h

    模型导出

    在docker容器中进入到rknn_yolov5_demo文件夹,也就是上面映射的文件夹

    1. cd /rknn_yolov5_demo

    执行示例代码:

    1. python3 ./test.py

    它会自动将onnx格式的模型转换成rknn格式的模型

    1. yolov5s-640-640.rknn

    yolov5模型训练

    启动训练

    1. python3 train.py --data data/lines/itheima_lines.yaml --weights yolov5s.pt --img 640

    模型导出为onnx

    1. python export.py --weights yolov5s.pt --include onnx --rknpu rk3566

    再将模型导出为

    鲁班猫板端

    RKNN Toolkit Lite2 主要用于 RKNN 模型在 Rockchip NPU 上的部署。
    在使用 RKNN Toolkit Lite2 之前,用户需要先通过 RKNN Toolkit2 将各深度学习框架导出的模型转成 RKNN 模型。
    RKNN Toolkit2 完整的安装包和使用文档可以从以下链接获取:
    https://github.com/rockchip-linux/rknn-toolkit2

    环境安装

  6. 安装依赖

    1. sudo apt-get install -y python3-opencv
    2. sudo apt-get install -y python3-numpy
  7. 安装rknn_toolkit_lite2板端推理

    1. pip3 install rknn_toolkit_lite2-1.x.0-cp310-cp310-linux_aarch64.whl
  8. 将RKNPU2中的对应的so文件复制到开发板中进行替换

    1. ls -al /usr/lib/librkn*
    2. /usr/lib/librknn_api.so
    3. /usr/lib/librknnrt.so
  9. 使用代码进行推理 ```python import os import urllib import traceback import time import sys import numpy as np import cv2

    from rknn.api import RKNN

    from rknnlite.api import RKNNLite

RKNN_MODEL = ‘yolov8n.rknn’ IMG_PATH = ‘./bus.jpg’ DATASET = ‘./dataset.txt’

QUANTIZE_ON = True

OBJ_THRESH = 0.25 NMS_THRESH = 0.45 IMG_SIZE = 640

CLASSES = (“person”, “bicycle”, “car”, “motorbike “, “aeroplane “, “bus “, “train”, “truck “, “boat”, “traffic light”, “fire hydrant”, “stop sign “, “parking meter”, “bench”, “bird”, “cat”, “dog “, “horse “, “sheep”, “cow”, “elephant”, “bear”, “zebra “, “giraffe”, “backpack”, “umbrella”, “handbag”, “tie”, “suitcase”, “frisbee”, “skis”, “snowboard”, “sports ball”, “kite”, “baseball bat”, “baseball glove”, “skateboard”, “surfboard”, “tennis racket”, “bottle”, “wine glass”, “cup”, “fork”, “knife “, “spoon”, “bowl”, “banana”, “apple”, “sandwich”, “orange”, “broccoli”, “carrot”, “hot dog”, “pizza “, “donut”, “cake”, “chair”, “sofa”, “pottedplant”, “bed”, “diningtable”, “toilet “, “tvmonitor”, “laptop “, “mouse “, “remote “, “keyboard “, “cell phone”, “microwave “, “oven “, “toaster”, “sink”, “refrigerator “, “book”, “clock”, “vase”, “scissors “, “teddy bear “, “hair drier”, “toothbrush “)

def sigmoid(x): return 1 / (1 + np.exp(-x))

def xywh2xyxy(x):

  1. # Convert [x, y, w, h] to [x1, y1, x2, y2]
  2. y = np.copy(x)
  3. y[:, 0] = x[:, 0] - x[:, 2] / 2 # top left x
  4. y[:, 1] = x[:, 1] - x[:, 3] / 2 # top left y
  5. y[:, 2] = x[:, 0] + x[:, 2] / 2 # bottom right x
  6. y[:, 3] = x[:, 1] + x[:, 3] / 2 # bottom right y
  7. return y

def process(input, mask, anchors):

  1. anchors = [anchors[i] for i in mask]
  2. grid_h, grid_w = map(int, input.shape[0:2])
  3. box_confidence = sigmoid(input[..., 4])
  4. box_confidence = np.expand_dims(box_confidence, axis=-1)
  5. box_class_probs = sigmoid(input[..., 5:])
  6. box_xy = sigmoid(input[..., :2])*2 - 0.5
  7. col = np.tile(np.arange(0, grid_w), grid_w).reshape(-1, grid_w)
  8. row = np.tile(np.arange(0, grid_h).reshape(-1, 1), grid_h)
  9. col = col.reshape(grid_h, grid_w, 1, 1).repeat(3, axis=-2)
  10. row = row.reshape(grid_h, grid_w, 1, 1).repeat(3, axis=-2)
  11. grid = np.concatenate((col, row), axis=-1)
  12. box_xy += grid
  13. box_xy *= int(IMG_SIZE/grid_h)
  14. box_wh = pow(sigmoid(input[..., 2:4])*2, 2)
  15. box_wh = box_wh * anchors
  16. box = np.concatenate((box_xy, box_wh), axis=-1)
  17. return box, box_confidence, box_class_probs

def filter_boxes(boxes, box_confidences, box_class_probs): “””Filter boxes with box threshold. It’s a bit different with origin yolov5 post process!

  1. # Arguments
  2. boxes: ndarray, boxes of objects.
  3. box_confidences: ndarray, confidences of objects.
  4. box_class_probs: ndarray, class_probs of objects.
  5. # Returns
  6. boxes: ndarray, filtered boxes.
  7. classes: ndarray, classes for boxes.
  8. scores: ndarray, scores for boxes.
  9. """
  10. boxes = boxes.reshape(-1, 4)
  11. box_confidences = box_confidences.reshape(-1)
  12. box_class_probs = box_class_probs.reshape(-1, box_class_probs.shape[-1])
  13. _box_pos = np.where(box_confidences >= OBJ_THRESH)
  14. boxes = boxes[_box_pos]
  15. box_confidences = box_confidences[_box_pos]
  16. box_class_probs = box_class_probs[_box_pos]
  17. class_max_score = np.max(box_class_probs, axis=-1)
  18. classes = np.argmax(box_class_probs, axis=-1)
  19. _class_pos = np.where(class_max_score >= OBJ_THRESH)
  20. boxes = boxes[_class_pos]
  21. classes = classes[_class_pos]
  22. scores = (class_max_score* box_confidences)[_class_pos]
  23. return boxes, classes, scores

def nms_boxes(boxes, scores): “””Suppress non-maximal boxes.

  1. # Arguments
  2. boxes: ndarray, boxes of objects.
  3. scores: ndarray, scores of objects.
  4. # Returns
  5. keep: ndarray, index of effective boxes.
  6. """
  7. x = boxes[:, 0]
  8. y = boxes[:, 1]
  9. w = boxes[:, 2] - boxes[:, 0]
  10. h = boxes[:, 3] - boxes[:, 1]
  11. areas = w * h
  12. order = scores.argsort()[::-1]
  13. keep = []
  14. while order.size > 0:
  15. i = order[0]
  16. keep.append(i)
  17. xx1 = np.maximum(x[i], x[order[1:]])
  18. yy1 = np.maximum(y[i], y[order[1:]])
  19. xx2 = np.minimum(x[i] + w[i], x[order[1:]] + w[order[1:]])
  20. yy2 = np.minimum(y[i] + h[i], y[order[1:]] + h[order[1:]])
  21. w1 = np.maximum(0.0, xx2 - xx1 + 0.00001)
  22. h1 = np.maximum(0.0, yy2 - yy1 + 0.00001)
  23. inter = w1 * h1
  24. ovr = inter / (areas[i] + areas[order[1:]] - inter)
  25. inds = np.where(ovr <= NMS_THRESH)[0]
  26. order = order[inds + 1]
  27. keep = np.array(keep)
  28. return keep

def yolov5_post_process(input_data): masks = [[0, 1, 2], [3, 4, 5], [6, 7, 8]] anchors = [[10, 13], [16, 30], [33, 23], [30, 61], [62, 45], [59, 119], [116, 90], [156, 198], [373, 326]]

  1. boxes, classes, scores = [], [], []
  2. for input, mask in zip(input_data, masks):
  3. b, c, s = process(input, mask, anchors)
  4. b, c, s = filter_boxes(b, c, s)
  5. boxes.append(b)
  6. classes.append(c)
  7. scores.append(s)
  8. boxes = np.concatenate(boxes)
  9. boxes = xywh2xyxy(boxes)
  10. classes = np.concatenate(classes)
  11. scores = np.concatenate(scores)
  12. nboxes, nclasses, nscores = [], [], []
  13. for c in set(classes):
  14. inds = np.where(classes == c)
  15. b = boxes[inds]
  16. c = classes[inds]
  17. s = scores[inds]
  18. keep = nms_boxes(b, s)
  19. nboxes.append(b[keep])
  20. nclasses.append(c[keep])
  21. nscores.append(s[keep])
  22. if not nclasses and not nscores:
  23. return None, None, None
  24. boxes = np.concatenate(nboxes)
  25. classes = np.concatenate(nclasses)
  26. scores = np.concatenate(nscores)
  27. return boxes, classes, scores

def draw(image, boxes, scores, classes): “””Draw the boxes on the image.

  1. # Argument:
  2. image: original image.
  3. boxes: ndarray, boxes of objects.
  4. classes: ndarray, classes of objects.
  5. scores: ndarray, scores of objects.
  6. all_classes: all classes name.
  7. """
  8. for box, score, cl in zip(boxes, scores, classes):
  9. top, left, right, bottom = box
  10. print('class: {}, score: {}'.format(CLASSES[cl], score))
  11. print('box coordinate left,top,right,down: [{}, {}, {}, {}]'.format(top, left, right, bottom))
  12. top = int(top)
  13. left = int(left)
  14. right = int(right)
  15. bottom = int(bottom)
  16. cv2.rectangle(image, (top, left), (right, bottom), (255, 255, 0), 2)
  17. cv2.putText(image, '{0} {1:.2f}'.format(CLASSES[cl], score),
  18. (top, left - 6),
  19. cv2.FONT_HERSHEY_SIMPLEX,
  20. 0.6, (0, 0, 255), 2)

def letterbox(im, new_shape=(640, 640), color=(0, 0, 0)):

  1. # Resize and pad image while meeting stride-multiple constraints
  2. shape = im.shape[:2] # current shape [height, width]
  3. if isinstance(new_shape, int):
  4. new_shape = (new_shape, new_shape)
  5. # Scale ratio (new / old)
  6. r = min(new_shape[0] / shape[0], new_shape[1] / shape[1])
  7. # Compute padding
  8. ratio = r, r # width, height ratios
  9. new_unpad = int(round(shape[1] * r)), int(round(shape[0] * r))
  10. dw, dh = new_shape[1] - new_unpad[0], new_shape[0] - new_unpad[1] # wh padding
  11. dw /= 2 # divide padding into 2 sides
  12. dh /= 2
  13. if shape[::-1] != new_unpad: # resize
  14. im = cv2.resize(im, new_unpad, interpolation=cv2.INTER_LINEAR)
  15. top, bottom = int(round(dh - 0.1)), int(round(dh + 0.1))
  16. left, right = int(round(dw - 0.1)), int(round(dw + 0.1))
  17. im = cv2.copyMakeBorder(im, top, bottom, left, right, cv2.BORDER_CONSTANT, value=color) # add border
  18. return im, ratio, (dw, dh)

if name == ‘main‘:

  1. # Create RKNN object
  2. rknn = RKNNLite(verbose=True)
  3. print('--> Load RKNN model')
  4. ret = rknn.load_rknn("yolov5s.rknn")
  5. # Init runtime environment
  6. print('--> Init runtime environment')
  7. ret = rknn.init_runtime()
  8. # ret = rknn.init_runtime('rk3566')
  9. if ret != 0:
  10. print('Init runtime environment failed!')
  11. exit(ret)
  12. print('done')
  13. # Set inputs
  14. img = cv2.imread(IMG_PATH)
  15. # img, ratio, (dw, dh) = letterbox(img, new_shape=(IMG_SIZE, IMG_SIZE))
  16. img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
  17. img = cv2.resize(img, (IMG_SIZE, IMG_SIZE))
  18. # Inference
  19. print('--> Running model')
  20. outputs = rknn.inference(inputs=[img])
  21. np.save('./onnx_yolov5_0.npy', outputs[0])
  22. np.save('./onnx_yolov5_1.npy', outputs[1])
  23. np.save('./onnx_yolov5_2.npy', outputs[2])
  24. print('done')
  25. # post process
  26. input0_data = outputs[0]
  27. input1_data = outputs[1]
  28. input2_data = outputs[2]
  29. input0_data = input0_data.reshape([3, -1]+list(input0_data.shape[-2:]))
  30. input1_data = input1_data.reshape([3, -1]+list(input1_data.shape[-2:]))
  31. input2_data = input2_data.reshape([3, -1]+list(input2_data.shape[-2:]))
  32. input_data = list()
  33. input_data.append(np.transpose(input0_data, (2, 3, 0, 1)))
  34. input_data.append(np.transpose(input1_data, (2, 3, 0, 1)))
  35. input_data.append(np.transpose(input2_data, (2, 3, 0, 1)))
  36. boxes, classes, scores = yolov5_post_process(input_data)
  37. img_1 = cv2.cvtColor(img, cv2.COLOR_RGB2BGR)
  38. if boxes is not None:
  39. draw(img_1, boxes, scores, classes)
  40. cv2.imwrite('result.jpg', img_1)
  41. rknn.release()
  1. <a name="ehmjK"></a>
  2. ### 多线程读取
  3. ```python
  4. import os
  5. import urllib
  6. import traceback
  7. import time
  8. import sys
  9. import numpy as np
  10. import cv2
  11. # from rknn.api import RKNN
  12. from rknnlite.api import RKNNLite
  13. import threading
  14. import queue
  15. # 自定义无缓存读视频类
  16. class VideoCapture:
  17. """Customized VideoCapture, always read latest frame"""
  18. def __init__(self, name):
  19. self.cap = cv2.VideoCapture(name)
  20. self.q = queue.Queue(maxsize=3)
  21. self.stop_threads = False # to gracefully close sub-thread
  22. th = threading.Thread(target=self._reader)
  23. th.daemon = True # 设置工作线程为后台运行
  24. th.start()
  25. # 实时读帧,只保存最后一帧
  26. def _reader(self):
  27. while not self.stop_threads:
  28. ret, frame = self.cap.read()
  29. if not ret:
  30. break
  31. if not self.q.empty():
  32. try:
  33. self.q.get_nowait()
  34. except queue.Empty:
  35. pass
  36. self.q.put(frame)
  37. def read(self):
  38. return self.q.get()
  39. def terminate(self):
  40. self.stop_threads = True
  41. self.cap.release()
  42. if __name__ == '__main__':
  43. capture = VideoCapture("/dev/video9")
  44. img = capture.read()
  45. while img is not None:
  46. cv2.imshow('src',img_1)
  47. img = capture.read()
  48. if ord("q") == cv2.waitKey(10):
  49. break;
  50. capture.terminate()

ROS订阅摄像头

  1. 创建工作空间

    1. mkdir -p car_ws/src
  2. 创建包

    1. ros2 pkg create --build-type ament_python yolo_detect
  • 导入代码 ```python import rclpy from rclpy.node import Node from std_msgs.msg import String from sensor_msgs.msg import Image from cv_bridge import CvBridge import cv2 as cv

class SubNode(Node): def init(self): super(SubNode, self).init(‘py_sub’) self.create_subscription(Image,’/image_raw’,self.call_back,10) self.cv_bridge = CvBridge()

  1. def call_back(self,msg):
  2. # self.get_logger().info(msg.data)
  3. image = self.cv_bridge.imgmsg_to_cv2(msg, 'bgr8')
  4. cv.imshow("image",image)
  5. cv.waitKey(10)

def main(): rclpy.init() node = SubNode() rclpy.spin(node) node.destroy_node() rclpy.shutdown()

if name == ‘main‘: main()

  1. - 修改配置setup.py
  2. ```python
  3. entry_points={
  4. 'console_scripts': [
  5. 'yolo_detect = yolo_detect.det:main',
  6. ],
  7. },

编译包

  1. colcon build --packages-select yolo_detect

启动节点

  1. ros2 run yolo_detect yolo_detect

导入usb_cam包

  1. colcon build
  2. source install/setup.bash

编译完成之后

  1. ros2 launch usb_cam demo_launch.py

常用指令

  1. ros2 topic list
  2. ros2 topic type /image_raw
  3. ros2 interface show sensor_msgs/msg/Image