下面的范例使用Pytorch的中阶API实现线性回归模型和和DNN二分类模型。
Pytorch的中阶API主要包括各种模型层,损失函数,优化器,数据管道等等。

  1. import os
  2. import datetime
  3. #打印时间
  4. def printbar():
  5. nowtime = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
  6. print("\n"+"=========="*8 + "%s"%nowtime)
  7. #mac系统上pytorch和matplotlib在jupyter中同时跑需要更改环境变量
  8. os.environ["KMP_DUPLICATE_LIB_OK"]="TRUE"

线性回归模型

准备数据

  1. import numpy as np
  2. import pandas as pd
  3. from matplotlib import pyplot as plt
  4. import torch
  5. from torch import nn
  6. import torch.nn.functional as F
  7. from torch.utils.data import Dataset,DataLoader,TensorDataset
  8. #样本数量
  9. n = 400
  10. # 生成测试用数据集
  11. X = 10*torch.rand([n,2])-5.0 #torch.rand是均匀分布
  12. w0 = torch.tensor([[2.0],[-3.0]])
  13. b0 = torch.tensor([[10.0]])
  14. Y = X@w0 + b0 + torch.normal( 0.0,2.0,size = [n,1]) # @表示矩阵乘法,增加正态扰动
  1. # 数据可视化
  2. %matplotlib inline
  3. %config InlineBackend.figure_format = 'svg'
  4. plt.figure(figsize = (12,5))
  5. ax1 = plt.subplot(121)
  6. ax1.scatter(X[:,0],Y[:,0], c = "b",label = "samples")
  7. ax1.legend()
  8. plt.xlabel("x1")
  9. plt.ylabel("y",rotation = 0)
  10. ax2 = plt.subplot(122)
  11. ax2.scatter(X[:,1],Y[:,0], c = "g",label = "samples")
  12. ax2.legend()
  13. plt.xlabel("x2")
  14. plt.ylabel("y",rotation = 0)
  15. plt.show()
  16. # 数据可视化
  17. %matplotlib inline
  18. %config InlineBackend.figure_format = 'svg'
  19. plt.figure(figsize = (12,5))
  20. ax1 = plt.subplot(121)
  21. ax1.scatter(X[:,0].numpy(),Y[:,0].numpy(), c = "b",label = "samples")
  22. ax1.legend()
  23. plt.xlabel("x1")
  24. plt.ylabel("y",rotation = 0)
  25. ax2 = plt.subplot(122)
  26. ax2.scatter(X[:,1].numpy(),Y[:,0].numpy(), c = "g",label = "samples")
  27. ax2.legend()
  28. plt.xlabel("x2")
  29. plt.ylabel("y",rotation = 0)
  30. plt.show()

image.png

  1. #构建输入数据管道
  2. ds = TensorDataset(X,Y)
  3. ds_train,ds_valid = torch.utils.data.random_split(ds,[int(400*0.7),400-int(400*0.7)])
  4. dl_train = DataLoader(ds_train,batch_size = 10,shuffle=True,num_workers=2)
  5. dl_valid = DataLoader(ds_valid,batch_size = 10,num_workers=2)

定义模型

  1. model = nn.Linear(2,1) #线性层
  2. model.loss_func = nn.MSELoss()
  3. model.optimizer = torch.optim.SGD(model.parameters(),lr = 0.01)

训练模型

  1. def train_step(model, features, labels):
  2. predictions = model(features)
  3. loss = model.loss_func(predictions,labels)
  4. loss.backward()
  5. model.optimizer.step()
  6. model.optimizer.zero_grad()
  7. return loss.item()
  8. # 测试train_step效果
  9. features,labels = next(iter(dl))
  10. train_step(model,features,labels)

image.png

  1. def train_model(model,epochs):
  2. for epoch in range(1,epochs+1):
  3. for features, labels in dl:
  4. loss = train_step(model,features,labels)
  5. if epoch%50==0:
  6. printbar()
  7. w = model.state_dict()["weight"]
  8. b = model.state_dict()["bias"]
  9. print("epoch =",epoch,"loss = ",loss)
  10. print("w =",w)
  11. print("b =",b)
  12. train_model(model,epochs = 200)

image.png

  1. # 结果可视化
  2. %matplotlib inline
  3. %config InlineBackend.figure_format = 'svg'
  4. w,b = model.state_dict()["weight"],model.state_dict()["bias"]
  5. plt.figure(figsize = (12,5))
  6. ax1 = plt.subplot(121)
  7. ax1.scatter(X[:,0],Y[:,0], c = "b",label = "samples")
  8. ax1.plot(X[:,0],w[0,0]*X[:,0]+b[0],"-r",linewidth = 5.0,label = "model")
  9. ax1.legend()
  10. plt.xlabel("x1")
  11. plt.ylabel("y",rotation = 0)
  12. ax2 = plt.subplot(122)
  13. ax2.scatter(X[:,1],Y[:,0], c = "g",label = "samples")
  14. ax2.plot(X[:,1],w[0,1]*X[:,1]+b[0],"-r",linewidth = 5.0,label = "model")
  15. ax2.legend()
  16. plt.xlabel("x2")
  17. plt.ylabel("y",rotation = 0)
  18. plt.show()

image.png


DNN二分类模型

准备数据

  1. import numpy as np
  2. import pandas as pd
  3. from matplotlib import pyplot as plt
  4. import torch
  5. from torch import nn
  6. %matplotlib inline
  7. %config InlineBackend.figure_format = 'svg'
  8. #正负样本数量
  9. n_positive,n_negative = 2000,2000
  10. #生成正样本, 小圆环分布
  11. r_p = 5.0 + torch.normal(0.0,1.0,size = [n_positive,1])
  12. theta_p = 2*np.pi*torch.rand([n_positive,1])
  13. Xp = torch.cat([r_p*torch.cos(theta_p),r_p*torch.sin(theta_p)],axis = 1)
  14. Yp = torch.ones_like(r_p)
  15. #生成负样本, 大圆环分布
  16. r_n = 8.0 + torch.normal(0.0,1.0,size = [n_negative,1])
  17. theta_n = 2*np.pi*torch.rand([n_negative,1])
  18. Xn = torch.cat([r_n*torch.cos(theta_n),r_n*torch.sin(theta_n)],axis = 1)
  19. Yn = torch.zeros_like(r_n)
  20. #汇总样本
  21. X = torch.cat([Xp,Xn],axis = 0)
  22. Y = torch.cat([Yp,Yn],axis = 0)
  23. #可视化
  24. plt.figure(figsize = (6,6))
  25. plt.scatter(Xp[:,0].numpy(),Xp[:,1].numpy(),c = "r")
  26. plt.scatter(Xn[:,0].numpy(),Xn[:,1].numpy(),c = "g")
  27. plt.legend(["positive","negative"]);

image.png

  1. #构建输入数据管道
  2. ds = TensorDataset(X,Y)
  3. dl = DataLoader(ds,batch_size = 10,shuffle=True,num_workers=2)

定义模型

  1. class DNNModel(nn.Module):
  2. def __init__(self):
  3. super(DNNModel, self).__init__()
  4. self.fc1 = nn.Linear(2,4)
  5. self.fc2 = nn.Linear(4,8)
  6. self.fc3 = nn.Linear(8,1)
  7. # 正向传播
  8. def forward(self,x):
  9. x = F.relu(self.fc1(x))
  10. x = F.relu(self.fc2(x))
  11. y = nn.Sigmoid()(self.fc3(x))
  12. return y
  13. # 损失函数
  14. def loss_func(self,y_pred,y_true):
  15. return nn.BCELoss()(y_pred,y_true)
  16. # 评估函数(准确率)
  17. def metric_func(self,y_pred,y_true):
  18. y_pred = torch.where(y_pred>0.5,torch.ones_like(y_pred,dtype = torch.float32),
  19. torch.zeros_like(y_pred,dtype = torch.float32))
  20. acc = torch.mean(1-torch.abs(y_true-y_pred))
  21. return acc
  22. # 优化器
  23. @property
  24. def optimizer(self):
  25. return torch.optim.Adam(self.parameters(),lr = 0.001)
  26. model = DNNModel()
  1. # 测试模型结构
  2. (features,labels) = next(iter(dl))
  3. predictions = model(features)
  4. loss = model.loss_func(predictions,labels)
  5. metric = model.metric_func(predictions,labels)
  6. print("init loss:",loss.item())
  7. print("init metric:",metric.item())
  8. # 测试模型结构
  9. batch_size = 10
  10. (features,labels) = next(data_iter(X,Y,batch_size))
  11. predictions = model(features)
  12. loss = model.loss_func(labels,predictions)
  13. metric = model.metric_func(labels,predictions)
  14. print("init loss:", loss.item())
  15. print("init metric:", metric.item())

image.png

训练模型

  1. def train_step(model, features, labels):
  2. # 正向传播求损失
  3. predictions = model(features)
  4. loss = model.loss_func(predictions,labels)
  5. metric = model.metric_func(predictions,labels)
  6. # 反向传播求梯度
  7. loss.backward()
  8. # 更新模型参数
  9. model.optimizer.step()
  10. model.optimizer.zero_grad()
  11. return loss.item(),metric.item()
  12. # 测试train_step效果
  13. features,labels = next(iter(dl))
  14. train_step(model,features,labels)

image.png

  1. def train_model(model,epochs):
  2. for epoch in range(1,epochs+1):
  3. loss_list,metric_list = [],[]
  4. for features, labels in dl:
  5. lossi,metrici = train_step(model,features,labels)
  6. loss_list.append(lossi)
  7. metric_list.append(metrici)
  8. loss = np.mean(loss_list)
  9. metric = np.mean(metric_list)
  10. if epoch%100==0:
  11. printbar()
  12. print("epoch =",epoch,"loss = ",loss,"metric = ",metric)
  13. train_model(model,epochs = 300)

image.png

  1. # 结果可视化
  2. fig, (ax1,ax2) = plt.subplots(nrows=1,ncols=2,figsize = (12,5))
  3. ax1.scatter(Xp[:,0],Xp[:,1], c="r")
  4. ax1.scatter(Xn[:,0],Xn[:,1],c = "g")
  5. ax1.legend(["positive","negative"]);
  6. ax1.set_title("y_true");
  7. Xp_pred = X[torch.squeeze(model.forward(X)>=0.5)]
  8. Xn_pred = X[torch.squeeze(model.forward(X)<0.5)]
  9. ax2.scatter(Xp_pred[:,0],Xp_pred[:,1],c = "r")
  10. ax2.scatter(Xn_pred[:,0],Xn_pred[:,1],c = "g")
  11. ax2.legend(["positive","negative"]);
  12. ax2.set_title("y_pred");

image.png