环境搭建
安装rk356x提供好的docker镜像,在该镜像中已经安装好了所需要的环境, 我们使用这个环境主要是将模型转换成rknn格式
参考链接:
https://github.com/airockchip/yolov5.git
https://github.com/rockchip-linux/rknpu2
https://github.com/rockchip-linux/rknn-toolkit2
加载docker
进到环境目录
rknn-toolkit2-1.x.x/docker/docker_full
加载镜像文件
docker load --input rknn-toolkit2-1.x.x-cp36-docker.tar.gz
查看是否加载成功
docker images
创建容器
docker run -it --shm-size 4G -v D:\DATA\ml\rknn-toolkit2-1.5.0\examples\onnx\yolov5:/rknn_yolov5_demo rknn-toolkit2:1.5.0-ml /bin/bash
其中”-v 主机目录:虚拟机目录” 是将本机目录映射到docker中的目录, 成功进到docker目录之后, 我们就可以来生成模型了
以后每次启动的时候,启动对应的容器
docker container ls
docker start -ai <container_name>
可以使用如下命令查看内存使用
df -h
模型导出
在docker容器中进入到rknn_yolov5_demo文件夹,也就是上面映射的文件夹
cd /rknn_yolov5_demo
执行示例代码:
python3 ./test.py
它会自动将onnx格式的模型转换成rknn格式的模型
yolov5s-640-640.rknn
yolov5模型训练
启动训练
python3 train.py --data data/lines/itheima_lines.yaml --weights yolov5s.pt --img 640
模型导出为onnx
python export.py --weights yolov5s.pt --include onnx --rknpu rk3566
鲁班猫板端
RKNN Toolkit Lite2 主要用于 RKNN 模型在 Rockchip NPU 上的部署。
在使用 RKNN Toolkit Lite2 之前,用户需要先通过 RKNN Toolkit2 将各深度学习框架导出的模型转成 RKNN 模型。
RKNN Toolkit2 完整的安装包和使用文档可以从以下链接获取:
https://github.com/rockchip-linux/rknn-toolkit2环境安装
安装依赖
sudo apt-get install -y python3-opencv
sudo apt-get install -y python3-numpy
安装rknn_toolkit_lite2板端推理
pip3 install rknn_toolkit_lite2-1.x.0-cp310-cp310-linux_aarch64.whl
将RKNPU2中的对应的so文件复制到开发板中进行替换
ls -al /usr/lib/librkn*
/usr/lib/librknn_api.so
/usr/lib/librknnrt.so
使用代码进行推理 ```python import os import urllib import traceback import time import sys import numpy as np import cv2
from rknn.api import RKNN
from rknnlite.api import RKNNLite
RKNN_MODEL = ‘yolov8n.rknn’ IMG_PATH = ‘./bus.jpg’ DATASET = ‘./dataset.txt’
QUANTIZE_ON = True
OBJ_THRESH = 0.25 NMS_THRESH = 0.45 IMG_SIZE = 640
CLASSES = (“person”, “bicycle”, “car”, “motorbike “, “aeroplane “, “bus “, “train”, “truck “, “boat”, “traffic light”, “fire hydrant”, “stop sign “, “parking meter”, “bench”, “bird”, “cat”, “dog “, “horse “, “sheep”, “cow”, “elephant”, “bear”, “zebra “, “giraffe”, “backpack”, “umbrella”, “handbag”, “tie”, “suitcase”, “frisbee”, “skis”, “snowboard”, “sports ball”, “kite”, “baseball bat”, “baseball glove”, “skateboard”, “surfboard”, “tennis racket”, “bottle”, “wine glass”, “cup”, “fork”, “knife “, “spoon”, “bowl”, “banana”, “apple”, “sandwich”, “orange”, “broccoli”, “carrot”, “hot dog”, “pizza “, “donut”, “cake”, “chair”, “sofa”, “pottedplant”, “bed”, “diningtable”, “toilet “, “tvmonitor”, “laptop “, “mouse “, “remote “, “keyboard “, “cell phone”, “microwave “, “oven “, “toaster”, “sink”, “refrigerator “, “book”, “clock”, “vase”, “scissors “, “teddy bear “, “hair drier”, “toothbrush “)
def sigmoid(x): return 1 / (1 + np.exp(-x))
def xywh2xyxy(x):
# Convert [x, y, w, h] to [x1, y1, x2, y2]
y = np.copy(x)
y[:, 0] = x[:, 0] - x[:, 2] / 2 # top left x
y[:, 1] = x[:, 1] - x[:, 3] / 2 # top left y
y[:, 2] = x[:, 0] + x[:, 2] / 2 # bottom right x
y[:, 3] = x[:, 1] + x[:, 3] / 2 # bottom right y
return y
def process(input, mask, anchors):
anchors = [anchors[i] for i in mask]
grid_h, grid_w = map(int, input.shape[0:2])
box_confidence = sigmoid(input[..., 4])
box_confidence = np.expand_dims(box_confidence, axis=-1)
box_class_probs = sigmoid(input[..., 5:])
box_xy = sigmoid(input[..., :2])*2 - 0.5
col = np.tile(np.arange(0, grid_w), grid_w).reshape(-1, grid_w)
row = np.tile(np.arange(0, grid_h).reshape(-1, 1), grid_h)
col = col.reshape(grid_h, grid_w, 1, 1).repeat(3, axis=-2)
row = row.reshape(grid_h, grid_w, 1, 1).repeat(3, axis=-2)
grid = np.concatenate((col, row), axis=-1)
box_xy += grid
box_xy *= int(IMG_SIZE/grid_h)
box_wh = pow(sigmoid(input[..., 2:4])*2, 2)
box_wh = box_wh * anchors
box = np.concatenate((box_xy, box_wh), axis=-1)
return box, box_confidence, box_class_probs
def filter_boxes(boxes, box_confidences, box_class_probs): “””Filter boxes with box threshold. It’s a bit different with origin yolov5 post process!
# Arguments
boxes: ndarray, boxes of objects.
box_confidences: ndarray, confidences of objects.
box_class_probs: ndarray, class_probs of objects.
# Returns
boxes: ndarray, filtered boxes.
classes: ndarray, classes for boxes.
scores: ndarray, scores for boxes.
"""
boxes = boxes.reshape(-1, 4)
box_confidences = box_confidences.reshape(-1)
box_class_probs = box_class_probs.reshape(-1, box_class_probs.shape[-1])
_box_pos = np.where(box_confidences >= OBJ_THRESH)
boxes = boxes[_box_pos]
box_confidences = box_confidences[_box_pos]
box_class_probs = box_class_probs[_box_pos]
class_max_score = np.max(box_class_probs, axis=-1)
classes = np.argmax(box_class_probs, axis=-1)
_class_pos = np.where(class_max_score >= OBJ_THRESH)
boxes = boxes[_class_pos]
classes = classes[_class_pos]
scores = (class_max_score* box_confidences)[_class_pos]
return boxes, classes, scores
def nms_boxes(boxes, scores): “””Suppress non-maximal boxes.
# Arguments
boxes: ndarray, boxes of objects.
scores: ndarray, scores of objects.
# Returns
keep: ndarray, index of effective boxes.
"""
x = boxes[:, 0]
y = boxes[:, 1]
w = boxes[:, 2] - boxes[:, 0]
h = boxes[:, 3] - boxes[:, 1]
areas = w * h
order = scores.argsort()[::-1]
keep = []
while order.size > 0:
i = order[0]
keep.append(i)
xx1 = np.maximum(x[i], x[order[1:]])
yy1 = np.maximum(y[i], y[order[1:]])
xx2 = np.minimum(x[i] + w[i], x[order[1:]] + w[order[1:]])
yy2 = np.minimum(y[i] + h[i], y[order[1:]] + h[order[1:]])
w1 = np.maximum(0.0, xx2 - xx1 + 0.00001)
h1 = np.maximum(0.0, yy2 - yy1 + 0.00001)
inter = w1 * h1
ovr = inter / (areas[i] + areas[order[1:]] - inter)
inds = np.where(ovr <= NMS_THRESH)[0]
order = order[inds + 1]
keep = np.array(keep)
return keep
def yolov5_post_process(input_data): masks = [[0, 1, 2], [3, 4, 5], [6, 7, 8]] anchors = [[10, 13], [16, 30], [33, 23], [30, 61], [62, 45], [59, 119], [116, 90], [156, 198], [373, 326]]
boxes, classes, scores = [], [], []
for input, mask in zip(input_data, masks):
b, c, s = process(input, mask, anchors)
b, c, s = filter_boxes(b, c, s)
boxes.append(b)
classes.append(c)
scores.append(s)
boxes = np.concatenate(boxes)
boxes = xywh2xyxy(boxes)
classes = np.concatenate(classes)
scores = np.concatenate(scores)
nboxes, nclasses, nscores = [], [], []
for c in set(classes):
inds = np.where(classes == c)
b = boxes[inds]
c = classes[inds]
s = scores[inds]
keep = nms_boxes(b, s)
nboxes.append(b[keep])
nclasses.append(c[keep])
nscores.append(s[keep])
if not nclasses and not nscores:
return None, None, None
boxes = np.concatenate(nboxes)
classes = np.concatenate(nclasses)
scores = np.concatenate(nscores)
return boxes, classes, scores
def draw(image, boxes, scores, classes): “””Draw the boxes on the image.
# Argument:
image: original image.
boxes: ndarray, boxes of objects.
classes: ndarray, classes of objects.
scores: ndarray, scores of objects.
all_classes: all classes name.
"""
for box, score, cl in zip(boxes, scores, classes):
top, left, right, bottom = box
print('class: {}, score: {}'.format(CLASSES[cl], score))
print('box coordinate left,top,right,down: [{}, {}, {}, {}]'.format(top, left, right, bottom))
top = int(top)
left = int(left)
right = int(right)
bottom = int(bottom)
cv2.rectangle(image, (top, left), (right, bottom), (255, 255, 0), 2)
cv2.putText(image, '{0} {1:.2f}'.format(CLASSES[cl], score),
(top, left - 6),
cv2.FONT_HERSHEY_SIMPLEX,
0.6, (0, 0, 255), 2)
def letterbox(im, new_shape=(640, 640), color=(0, 0, 0)):
# Resize and pad image while meeting stride-multiple constraints
shape = im.shape[:2] # current shape [height, width]
if isinstance(new_shape, int):
new_shape = (new_shape, new_shape)
# Scale ratio (new / old)
r = min(new_shape[0] / shape[0], new_shape[1] / shape[1])
# Compute padding
ratio = r, r # width, height ratios
new_unpad = int(round(shape[1] * r)), int(round(shape[0] * r))
dw, dh = new_shape[1] - new_unpad[0], new_shape[0] - new_unpad[1] # wh padding
dw /= 2 # divide padding into 2 sides
dh /= 2
if shape[::-1] != new_unpad: # resize
im = cv2.resize(im, new_unpad, interpolation=cv2.INTER_LINEAR)
top, bottom = int(round(dh - 0.1)), int(round(dh + 0.1))
left, right = int(round(dw - 0.1)), int(round(dw + 0.1))
im = cv2.copyMakeBorder(im, top, bottom, left, right, cv2.BORDER_CONSTANT, value=color) # add border
return im, ratio, (dw, dh)
if name == ‘main‘:
# Create RKNN object
rknn = RKNNLite(verbose=True)
print('--> Load RKNN model')
ret = rknn.load_rknn("yolov5s.rknn")
# Init runtime environment
print('--> Init runtime environment')
ret = rknn.init_runtime()
# ret = rknn.init_runtime('rk3566')
if ret != 0:
print('Init runtime environment failed!')
exit(ret)
print('done')
# Set inputs
img = cv2.imread(IMG_PATH)
# img, ratio, (dw, dh) = letterbox(img, new_shape=(IMG_SIZE, IMG_SIZE))
img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
img = cv2.resize(img, (IMG_SIZE, IMG_SIZE))
# Inference
print('--> Running model')
outputs = rknn.inference(inputs=[img])
np.save('./onnx_yolov5_0.npy', outputs[0])
np.save('./onnx_yolov5_1.npy', outputs[1])
np.save('./onnx_yolov5_2.npy', outputs[2])
print('done')
# post process
input0_data = outputs[0]
input1_data = outputs[1]
input2_data = outputs[2]
input0_data = input0_data.reshape([3, -1]+list(input0_data.shape[-2:]))
input1_data = input1_data.reshape([3, -1]+list(input1_data.shape[-2:]))
input2_data = input2_data.reshape([3, -1]+list(input2_data.shape[-2:]))
input_data = list()
input_data.append(np.transpose(input0_data, (2, 3, 0, 1)))
input_data.append(np.transpose(input1_data, (2, 3, 0, 1)))
input_data.append(np.transpose(input2_data, (2, 3, 0, 1)))
boxes, classes, scores = yolov5_post_process(input_data)
img_1 = cv2.cvtColor(img, cv2.COLOR_RGB2BGR)
if boxes is not None:
draw(img_1, boxes, scores, classes)
cv2.imwrite('result.jpg', img_1)
rknn.release()
<a name="ehmjK"></a>
### 多线程读取
```python
import os
import urllib
import traceback
import time
import sys
import numpy as np
import cv2
# from rknn.api import RKNN
from rknnlite.api import RKNNLite
import threading
import queue
# 自定义无缓存读视频类
class VideoCapture:
"""Customized VideoCapture, always read latest frame"""
def __init__(self, name):
self.cap = cv2.VideoCapture(name)
self.q = queue.Queue(maxsize=3)
self.stop_threads = False # to gracefully close sub-thread
th = threading.Thread(target=self._reader)
th.daemon = True # 设置工作线程为后台运行
th.start()
# 实时读帧,只保存最后一帧
def _reader(self):
while not self.stop_threads:
ret, frame = self.cap.read()
if not ret:
break
if not self.q.empty():
try:
self.q.get_nowait()
except queue.Empty:
pass
self.q.put(frame)
def read(self):
return self.q.get()
def terminate(self):
self.stop_threads = True
self.cap.release()
if __name__ == '__main__':
capture = VideoCapture("/dev/video9")
img = capture.read()
while img is not None:
cv2.imshow('src',img_1)
img = capture.read()
if ord("q") == cv2.waitKey(10):
break;
capture.terminate()
ROS订阅摄像头
创建工作空间
mkdir -p car_ws/src
创建包
ros2 pkg create --build-type ament_python yolo_detect
- 导入代码 ```python import rclpy from rclpy.node import Node from std_msgs.msg import String from sensor_msgs.msg import Image from cv_bridge import CvBridge import cv2 as cv
class SubNode(Node): def init(self): super(SubNode, self).init(‘py_sub’) self.create_subscription(Image,’/image_raw’,self.call_back,10) self.cv_bridge = CvBridge()
def call_back(self,msg):
# self.get_logger().info(msg.data)
image = self.cv_bridge.imgmsg_to_cv2(msg, 'bgr8')
cv.imshow("image",image)
cv.waitKey(10)
def main(): rclpy.init() node = SubNode() rclpy.spin(node) node.destroy_node() rclpy.shutdown()
if name == ‘main‘: main()
- 修改配置setup.py
```python
entry_points={
'console_scripts': [
'yolo_detect = yolo_detect.det:main',
],
},
编译包
colcon build --packages-select yolo_detect
启动节点
ros2 run yolo_detect yolo_detect
导入usb_cam包
colcon build
source install/setup.bash
编译完成之后
ros2 launch usb_cam demo_launch.py
常用指令
ros2 topic list
ros2 topic type /image_raw
ros2 interface show sensor_msgs/msg/Image