这块之前已经操练过了,把网络结构与训练函数放在这里。
网络结构
import timeimport torchfrom torch import nn, optimimport torchvisionfrom utils import *class LeNet(nn.Module): def __init__(self): super(LeNet, self).__init__() self.conv = nn.Sequential( nn.Conv2d(1, 6, 5), # in_channels, out_channels, kernel_size nn.ReLU(), nn.MaxPool2d(2, 2), # kernel_size, stride nn.Conv2d(6, 16, 5), nn.ReLU(), nn.MaxPool2d(2, 2) ) self.fc = nn.Sequential( nn.Linear(16*4*4, 512), nn.LeakyReLU(negative_slope=0.01), nn.Dropout(0.5), # nn.ReLU(), nn.Linear(512, 256), # nn.LeakyReLU(negative_slope=0.01), nn.Sigmoid(), nn.Dropout(0.5), nn.Linear(256, 200) ) def forward(self, img): feature = self.conv(img) output = self.fc(feature.view(img.shape[0], -1)) return outputif __name__ == "__main__": net = LeNet() print(net) batch_size = 64 resize = [28, 28] train_iter, test_iter = load_data_cnn(batch_size, resize=resize) lr, num_epochs = 0.001, 50 loss = torch.nn.CrossEntropyLoss() optimizer = torch.optim.Adam(net.parameters(), lr=lr, weight_decay=0.001) train(net, train_iter, test_iter, loss=loss, optimizer=optimizer, device='cuda', num_epochs=num_epochs, file='cnn.pt')
数据集与训练函数
import torchfrom torch.utils.data import *import structimport osimport timeimport torchvisionfrom PIL import Imagefrom torchvision.datasets.vision import VisionDatasetfrom torchvision.transforms import ToPILImage# from IPython.display import Imageimport matplotlib.pyplot as pltfrom IPython import displayimport hiddenlayer as hlclass ImageSet(Dataset): def __init__(self, path, dimensions, classes, size): super(ImageSet, self).__init__() data_raw = () label_raw = [] for i in range(classes): file_name = os.path.join(path, 'f' + str(i) + '.dat') with open(file_name, 'rb') as f: data_raw += struct.unpack('f' * size * dimensions, f.read(4 * size * dimensions)) for j in range(size): label_raw.append(float(i)) self.train_data = torch.tensor(data_raw).clone().view(-1, dimensions) self.train_label = torch.tensor(label_raw) def __getitem__(self, index): return self.train_data[index], self.train_label[index].long() def __len__(self): return self.train_label.size()[0]class ImageSetCNN(VisionDataset): def __init__(self, root, dim_x, dim_y, classes, size, transform=None, target_transform=None): super(ImageSetCNN, self).__init__(root, transform=transform, target_transform=target_transform) data_raw = () label_raw = [] for i in range(classes): file_name = os.path.join(root, 'f' + str(i) + '.dat') with open(file_name, 'rb') as f: data_raw += struct.unpack('f' * size * dim_x * dim_y, f.read(4 * size * dim_x * dim_y)) for j in range(size): # one_hot = torch.zeros(classes) # one_hot[i] = 1 label_raw.append(i) self.train_data = torch.tensor(data_raw).clone().view(-1, dim_x, dim_y) self.train_label = torch.tensor(label_raw) def __getitem__(self, index): img, target = self.train_data[index], int(self.train_label[index]) img = Image.fromarray(img.numpy(), mode='L') # img.show() if self.transform is not None: img = self.transform(img) if self.target_transform is not None: target = self.target_transform(target) return img, target def __len__(self): return self.train_label.size()[0]def load_data(batch_size): training_set = ImageSet('train', 440, 200, 144) test_set = ImageSet('test', 440, 200, 18) num_workers = 4 train_iter = DataLoader(training_set, batch_size=batch_size, shuffle=True, num_workers=num_workers) test_iter = DataLoader(test_set, batch_size=batch_size, shuffle=True, num_workers=num_workers) return train_iter, test_iterdef load_data_cnn(batch_size, resize=None): trans = [] if resize: trans.append(torchvision.transforms.Resize(size=resize)) trans.append(torchvision.transforms.ToTensor()) transform = torchvision.transforms.Compose(trans) training_set = ImageSetCNN('train', 20, 22, 200, 144, transform=transform) test_set = ImageSetCNN('test', 20, 22, 200, 18, transform=transform) train_iter = DataLoader(training_set, batch_size=batch_size, shuffle=True, num_workers=4) test_iter = DataLoader(test_set, batch_size=batch_size, shuffle=False, num_workers=4) return train_iter, test_iterdef train(net, train_iter, test_iter, loss, optimizer, device, num_epochs, file): # 记录训练过程的指标 history = hl.History() # 使用canvas进行可视化 canvas = hl.Canvas() net = net.to(device) print("training on ", device) best_acc = 0 no_progress = 0 # loss = torch.nn.CrossEntropyLoss() for epoch in range(num_epochs): train_l_sum, train_acc_sum, n, batch_count, start = 0.0, 0.0, 0, 0, time.time() for X, y in train_iter: X = X.to(device) y = y.to(device) y_hat = net(X) l = loss(y_hat, y) optimizer.zero_grad() l.backward() optimizer.step() train_l_sum += l.cpu().item() train_acc_sum += (y_hat.argmax(dim=1) == y).sum().cpu().item() n += y.shape[0] batch_count += 1 test_acc = evaluate(test_iter, net) print('epoch %d, loss %.4f, train acc %.3f, test acc %.3f, time %.1f sec' % (epoch + 1, train_l_sum / batch_count, train_acc_sum / n, test_acc, time.time() - start)) history.log(epoch, train_loss=train_l_sum / batch_count, train_acc=train_acc_sum / n, test_acc=test_acc) # 可视化 with canvas: canvas.draw_plot(history["train_loss"]) canvas.draw_plot(history["train_acc"]) canvas.draw_plot(history["test_acc"]) if test_acc > best_acc: no_progress = 0 torch.save(net.state_dict(), file) best_acc = test_acc print('best model saved to ' + file) else: no_progress += 1 if no_progress > 20: breakdef evaluate(data_iter, net, device=None): if device is None and isinstance(net, torch.nn.Module): device = list(net.parameters())[0].device acc_sum, n = 0.0, 0 with torch.no_grad(): for X, y in data_iter: net.eval() # 评估模式, 关闭dropout正则化层 acc_sum += (net(X.to(device)).argmax(dim=1) == y.to(device)).float().sum().cpu().item() net.train() # 改回训练模式 n += y.shape[0] return acc_sum / n