系统环境
- Ubuntu 18.04 LTS
- Python 3.7.6
- PyTorch 1.4.0
- CUDA 10.1
- cuDNN 7.6.5
Windows下有相应环境也可以,我电脑Linux下风扇驱动不理想,跑GPU太烫,训练的代码在Windows上也可以跑。
制作数据集
数据集在“素材”目录下,以下是num目录结构说明。
.
└── num
├── 0-original 负样本原图
│ └── 0
├── 0-processed 负样本处理图
│ └── 0
├── num1-original 数字1原图
│ └── num1
├── num1-processed 数字1处理图
│ └── num1
├── num2-original 数字2原图
│ └── num2
├── num2-processed 数字2处理图
│ └── num2
├── num3-original 数字3原图
│ └── num3
├── num3-processed 数字3处理图
│ └── num3
├── num4-original 数字4原图
│ └── num4
├── num4-processed 数字4处理图
│ └── num4
├── num5-original 数字5原图
│ └── num5
├── num5-processed 数字5处理图
│ └── num5
├── sentinel-original 哨兵标记原图
│ └── sentinel
└── sentinel-processed 哨兵标记处理图
└── sentinel
根据目录结构,制作数据集,在CSV文件中打印图片路径和对应分类标签,
"""
@author starrysky
@date 2020/08/16
@details 制作数据集标签
"""
import pandas as pd
import os
df = pd.DataFrame({
"image_path": [],
"label": [],
})
# 素材文件目录路径
src_dir = r"./素材/num/0-processed/0/"
files = os.listdir(src_dir)
for i in files:
# print(src_dir + i)
df.loc[df.shape[0] + 1] = {
"image_path": src_dir + i,
"label": 0,
}
dir_type_list = ["close", "closeh", "far", "farh"]
label_type_list = ["num1", "num2", "num3", "num4", "num5", "sentinel"]
for num in range(0, 6):
for dir_name in dir_type_list:
# 素材文件目录路径
src_dir = r"./素材/num/" + label_type_list[num] + "-processed/" + label_type_list[num] + "/" + dir_name + "/"
files = os.listdir(src_dir)
for i in files:
# print(src_dir + i)
df.loc[df.shape[0] + 1] = {
"image_path": src_dir + i,
"label": num + 1,
}
# print(df)
df.to_csv("./素材/num/label.csv")
在PyTorch中自定义数据集
在PyTrich种制作数据集主要是要将样本转换成Tensor
格式,首先要继承torch.utils.data.Dataset这个父类,然后根据PyTorch的要求实现一些魔法方法,如init,getitem,len。
trans = transforms.ToTensor()
def default_loader(path):
"""
定义读取图片的格式为28*28的单通道灰度图
:param path: 图片路径
:return: 图片
"""
return Image.open(path).convert('L').resize((28, 28))
class MyDataset(Dataset):
"""
制作数据集
"""
def __init__(self, csv_path, transform=None, loader=default_loader):
"""
:param csv_path: 文件路径
:param transform: 转后后的Tensor格式
:param loader: 图片加载方式
"""
super(MyDataset, self).__init__()
df = pd.read_csv(csv_path, engine="python", encoding="utf-8")
self.df = df
self.transform = transform
self.loader = loader
def __getitem__(self, index):
"""
按照索引从数据集提取对应样本的信息
Args:
index: 索引值
Returns:
特征和标签
"""
fn = self.df.iloc[index][1]
label = self.df.iloc[index][2]
img = self.loader(fn)
# 按照路径读取图片
if self.transform is not None:
# 数据标签转换为Tensor
img = self.transform(img)
return img, label
def __len__(self):
"""
样本数量
Returns:
样本数量
"""
return len(self.df)
定义好之后就可以创建数据集对象,打印着看一下。
# 数据集元数据文件的路径
metadata_path = r"./素材/num/label.csv"
# 批次规模
batch_size = 64
# 线程数
if sys.platform == "win32":
num_workers = 0
else:
num_workers = 12
# 训练集占比
train_rate = 0.8
# 创建数据集
src_data = MyDataset(csv_path=metadata_path, transform=trans)
print('num_of_trainData:', len(src_data))
# K折交叉验证
train_size = int(train_rate * len(src_data))
test_size = len(src_data) - train_size
train_set, test_set = torch.utils.data.random_split(src_data, [train_size, test_size])
train_iter = DataLoader(dataset=train_set, batch_size=batch_size, shuffle=True, num_workers=num_workers)
test_iter = DataLoader(dataset=test_set, batch_size=batch_size, shuffle=True, num_workers=num_workers)
# 打印数据集
for i, j in train_iter:
print(i, j)
break
运行结果如下,展示了一个批次共64个样本。
num_of_trainData: 8120
tensor([[[[0.0039, 0.0000, 0.0000, ..., 0.0000, 0.0000, 0.0000],
[0.0000, 0.0157, 0.0000, ..., 0.0000, 0.0000, 0.0000],
[0.0039, 0.0000, 0.0078, ..., 0.0000, 0.0000, 0.0000],
...,
[0.0000, 0.0000, 0.0000, ..., 0.0000, 0.0000, 0.0000],
[0.0000, 0.0000, 0.0000, ..., 0.0000, 0.0000, 0.0000],
[0.0000, 0.0000, 0.0000, ..., 0.0000, 0.0000, 0.0000]]],
[[[0.9922, 1.0000, 0.0000, ..., 0.0039, 0.0000, 0.0039],
[1.0000, 0.9725, 0.0000, ..., 0.0078, 0.0000, 0.0235],
[0.0000, 0.0000, 0.0039, ..., 0.0000, 0.0000, 0.0000],
...,
[0.0000, 0.0000, 0.0000, ..., 0.0000, 0.0196, 0.0078],
[0.0000, 0.0000, 0.0000, ..., 0.0235, 0.0078, 0.0000],
[0.0000, 0.0000, 0.0000, ..., 0.0039, 0.0000, 1.0000]]],
[[[0.0000, 0.0000, 0.0000, ..., 0.0078, 0.0000, 0.0000],
[0.0000, 0.0000, 0.0000, ..., 0.0000, 0.0000, 0.0039],
[0.0000, 0.0000, 0.0000, ..., 0.0157, 0.0039, 0.0000],
...,
[0.0000, 0.0000, 0.0157, ..., 0.0000, 0.0000, 0.0078],
[0.0039, 0.0118, 0.0000, ..., 0.0549, 0.0000, 0.0000],
[0.0000, 0.0039, 0.0000, ..., 0.0000, 0.0118, 0.0235]]],
...,
[[[0.9373, 0.0118, 0.0000, ..., 0.0000, 0.0000, 0.0000],
[0.1529, 0.1255, 0.1059, ..., 0.0000, 0.0000, 0.0000],
[0.9961, 1.0000, 0.7412, ..., 0.0000, 0.0000, 0.0000],
...,
[0.0275, 0.5765, 0.5804, ..., 0.0000, 0.0000, 0.0000],
[0.0078, 0.0000, 0.0039, ..., 0.0000, 0.0000, 0.0000],
[0.0000, 0.0000, 0.0000, ..., 0.0000, 0.0000, 0.0000]]],
[[[1.0000, 0.9961, 1.0000, ..., 1.0000, 1.0000, 1.0000],
[0.9961, 1.0000, 0.9804, ..., 1.0000, 1.0000, 1.0000],
[0.9961, 1.0000, 0.9961, ..., 0.9961, 0.9961, 0.9961],
...,
[0.9961, 1.0000, 1.0000, ..., 1.0000, 1.0000, 1.0000],
[1.0000, 0.9843, 1.0000, ..., 1.0000, 1.0000, 1.0000],
[0.0000, 1.0000, 1.0000, ..., 0.0000, 0.0000, 0.0000]]],
[[[0.0000, 0.0000, 0.0078, ..., 0.9922, 1.0000, 1.0000],
[0.0000, 0.0039, 0.0078, ..., 1.0000, 0.9961, 1.0000],
[0.0000, 0.0000, 0.2980, ..., 0.9961, 0.9843, 0.9922],
...,
[0.0039, 0.0039, 0.0078, ..., 1.0000, 1.0000, 1.0000],
[0.0000, 0.0000, 0.0000, ..., 1.0000, 1.0000, 0.9647],
[0.0078, 0.0039, 0.0157, ..., 1.0000, 0.9843, 1.0000]]]]) tensor([4., 6., 0., 6., 6., 4., 6., 2., 4., 6., 4., 6., 2., 6., 2., 1., 5., 4.,
2., 2., 4., 3., 2., 1., 3., 3., 2., 6., 4., 3., 5., 4., 6., 2., 2., 5.,
1., 2., 1., 1., 6., 5., 6., 4., 3., 2., 2., 3., 4., 1., 5., 5., 2., 6.,
2., 2., 5., 3., 3., 5., 2., 3., 6., 6.])
定义模型
之前做MNIST问题的时候使用LeNet也可以取得比较不错的效果,这里再考虑到小电脑的计算能力,因为实际应用对于实时性的要求也非常高,暂时没有使用一些深度卷积神经网络的经典模型。
整个模型结构就是基本使用了LeNet的结构,我只修改了输出层参数为7。
class LeNet(nn.Module):
"""
定义模型, 这里使用LeNet
"""
def __init__(self):
super(LeNet, self).__init__()
# 卷积层
self.conv = nn.Sequential(
# 输入通道数, 输出通道数, kernel_size
nn.Conv2d(1, 6, 5),
nn.Sigmoid(),
# 最大池化
nn.MaxPool2d(2, 2),
nn.Conv2d(6, 16, 5),
nn.Sigmoid(),
nn.MaxPool2d(2, 2)
)
# 全连接层
self.fc = nn.Sequential(
nn.Linear(16 * 4 * 4, 120),
nn.Sigmoid(),
nn.Linear(120, 84),
nn.Sigmoid(),
nn.Linear(84, 7)
)
def forward(self, img):
feature = self.conv(img)
output = self.fc(feature.view(img.shape[0], -1))
return output
net = LeNet()
print(net)
模型结构展示。
LeNet(
(conv): Sequential(
(0): Conv2d(1, 6, kernel_size=(5, 5), stride=(1, 1))
(1): Sigmoid()
(2): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
(3): Conv2d(6, 16, kernel_size=(5, 5), stride=(1, 1))
(4): Sigmoid()
(5): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
)
(fc): Sequential(
(0): Linear(in_features=256, out_features=120, bias=True)
(1): Sigmoid()
(2): Linear(in_features=120, out_features=84, bias=True)
(3): Sigmoid()
(4): Linear(in_features=84, out_features=7, bias=True)
)
)
评估模型
计算模型的分类正确率。
# %% 模型评估
def evaluate_accuracy(data_iter, net, device=None):
"""
评估模型, GPU加速运算
:param data_iter: 测试集迭代器
:param net: 待评估模型
:param device: 训练设备
:return: 正确率
"""
# 未指定训练设备的话就使用 net 的 device
if device is None and isinstance(net, nn.Module):
device = list(net.parameters())[0].device
acc_sum, n = 0.0, 0
with torch.no_grad():
for x, y in data_iter:
if isinstance(net, nn.Module):
# 评估模式, 关闭dropout(丢弃法)
net.eval()
acc_sum += (net(x.to(device)).argmax(dim=1) == y.to(device)).float().sum().cpu().item()
# 改回训练模式
net.train()
n += y.shape[0]
return acc_sum / n
训练模型
参考之前跟着Dive-into-DL-PyTorch学习时候的一套训练流程。
def train_model(net, train_iter, test_iter, loss_func, optimizer, device, num_epochs):
"""
训练模型
:param net: 原始网络
:param train_iter: 训练集
:param test_iter: 测试集
:param loss_func: 损失函数
:param optimizer: 优化器
:param device: 训练设备
:param num_epochs: 训练周期
:return: 无
"""
net = net.to(device)
print("训练设备={0}".format(device))
for i in range(num_epochs):
# 总误差, 准确率
train_lose_sum, train_acc_sum = 0.0, 0.0
# 样本数量, 批次数量
sample_count, batch_count = 0, 0
# 训练时间
start = time.time()
for x, y in train_iter:
# x, y = j
x = x.to(device)
y = y.long().to(device)
y_output = net(x)
lose = loss_func(y_output, y)
optimizer.zero_grad()
lose.backward()
optimizer.step()
train_lose_sum += lose.cpu().item()
train_acc_sum += (y_output.argmax(dim=1) == y).sum().cpu().item()
sample_count += y.shape[0]
batch_count += 1
test_acc = evaluate_accuracy(test_iter, net)
print("第{0}个周期, lose={1:.3f}, train_acc={2:.3f}, test_acc={3:.3f}, time={4:.1f}".format(
i, train_lose_sum / batch_count, train_acc_sum / sample_count, test_acc, time.time() - start
))
训练过程
配置超参数,训练模型。由于数据集也不是很大,采用K折交叉验证来重用数据。
if __name__ == '__main__':
# %% 设置工作路径
# print(os.getcwd())
# os.chdir(os.getcwd() + "\learn")
# 获取当前文件路径
print(os.getcwd())
# %% 超参数配置
# 数据集元数据文件的路径
metadata_path = r"./素材/num/label.csv"
# 训练设备
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# 批次规模
batch_size = 64
# 线程数
if sys.platform == "win32":
num_workers = 0
else:
num_workers = 12
# 训练集占比
train_rate = 0.8
# 创建数据集
# src_data = MyDataset(csv_path=metadata_path, transform=trans)
src_data = MyDataset(csv_path=metadata_path, transform=trans)
print('num_of_trainData:', len(src_data))
# K折交叉验证
train_size = int(train_rate * len(src_data))
test_size = len(src_data) - train_size
train_set, test_set = torch.utils.data.random_split(src_data, [train_size, test_size])
train_iter = DataLoader(dataset=train_set, batch_size=batch_size, shuffle=True, num_workers=num_workers)
test_iter = DataLoader(dataset=test_set, batch_size=batch_size, shuffle=True, num_workers=num_workers)
# 打印数据集
for i, j in train_iter:
print(i, j)
break
net = LeNet()
print(net)
# 训练次数
num_epochs = 5
# 优化算法
optimizer = torch.optim.Adam(net.parameters(), lr=0.002)
# 交叉熵损失函数
loss_func = nn.CrossEntropyLoss()
for i in range(10):
# K折交叉验证
train_size = int(train_rate * len(src_data))
test_size = len(src_data) - train_size
train_set, test_set = torch.utils.data.random_split(src_data, [train_size, test_size])
train_iter = DataLoader(dataset=train_set, batch_size=batch_size, shuffle=True, num_workers=num_workers)
test_iter = DataLoader(dataset=test_set, batch_size=batch_size, shuffle=True, num_workers=num_workers)
train_model(net, train_iter, test_iter, loss_func, optimizer, device, num_epochs)
保存模型
保存训练好的参数,以便后续使用。
# 保存训练后的模型数据
torch.save(net.state_dict(), "./model_param/state_dict.pt")
测试模型
测试在整个数据集上的正确率,并导出供LibTorch使用的模型,整体速度还是非常快。
"""
@author starrysky
@date 2020/08/16
@details 加载训练好的模型参数, 测试模型, 导出完整的模型供C++项目部署使用
"""
import sys
sys.path.append("./")
import time
import torch
from learn import mymodel
import os
import pandas as pd
# %%
# print(os.getcwd())
# os.chdir(os.getcwd() + r"\test")
print(os.getcwd())
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# %%
model = mymodel.LeNet()
model.load_state_dict(torch.load("./model_param/state_dict.pt"))
model.eval()
print(model)
# 加载样本和标签
df = pd.read_csv("./素材/num/label.csv", index_col=0)
df["predict"] = None
df["is_correct"] = None
print(df.columns)
# %%
ans = 0
start_time = time.time()
for i in range(df.shape[0]):
image_path = df.iloc[i, 0]
image = mymodel.default_loader(image_path)
label = df.iloc[i, 1]
x = mymodel.trans(image)
x_ = x.view(1, 1, 28, 28)
y_predict = model(x_).argmax(dim=1).item()
# print(i, y_predict)
df.iloc[i, 2] = y_predict
df.iloc[i, 3] = y_predict == label
if y_predict == label:
ans += 1
print("正确样本数:{0}, 正确率={1:.4f}".format(ans, ans / df.shape[0]))
print("测试时间:{0:.4f}".format(time.time() - start_time))
# %%
df.to_csv("result.csv", index=False)
image_path = df.iloc[0, 0]
image = mymodel.default_loader(image_path)
x = mymodel.trans(image)
x_ = x.view(1, 1, 28, 28)
traced_script_module = torch.jit.trace(model, x_)
traced_script_module.save("./libtorch_model/model.pt")
小结
后续在C++和OpenCV环境下的部署。在C++环境下和小电脑上的运行时间还是未知数,但是鉴于当前LeNet的运行速度还是比较快的,后续可以考虑使用深度卷积神经网络的模型。