下面的范例使用Pytorch的低阶API实现线性回归模型和DNN二分类模型。
低阶API主要包括张量操作,计算图和自动微分。

  1. import os
  2. import datetime
  3. #打印时间
  4. def printbar():
  5. nowtime = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
  6. print("\n"+"=========="*8 + "%s"%nowtime)
  7. #mac系统上pytorch和matplotlib在jupyter中同时跑需要更改环境变量
  8. os.environ["KMP_DUPLICATE_LIB_OK"]="TRUE"

线性回归模型

准备数据

  1. import numpy as np
  2. import pandas as pd
  3. from matplotlib import pyplot as plt
  4. import torch
  5. from torch import nn
  6. #样本数量
  7. n = 400
  8. # 生成测试用数据集
  9. X = 10*torch.rand([n,2])-5.0 #torch.rand是均匀分布
  10. w0 = torch.tensor([[2.0],[-3.0]])
  11. b0 = torch.tensor([[10.0]])
  12. Y = X@w0 + b0 + torch.normal( 0.0,2.0,size = [n,1]) # @表示矩阵乘法,增加正态扰动
  1. # 数据可视化
  2. %matplotlib inline
  3. %config InlineBackend.figure_format = 'svg'
  4. plt.figure(figsize = (12,5))
  5. ax1 = plt.subplot(121)
  6. ax1.scatter(X[:,0].numpy(),Y[:,0].numpy(), c = "b",label = "samples")
  7. ax1.legend()
  8. plt.xlabel("x1")
  9. plt.ylabel("y",rotation = 0)
  10. ax2 = plt.subplot(122)
  11. ax2.scatter(X[:,1].numpy(),Y[:,0].numpy(), c = "g",label = "samples")
  12. ax2.legend()
  13. plt.xlabel("x2")
  14. plt.ylabel("y",rotation = 0)
  15. plt.show()

image.png

  1. # 构建数据管道迭代器
  2. def data_iter(features, labels, batch_size=8):
  3. num_examples = len(features)
  4. indices = list(range(num_examples))
  5. np.random.shuffle(indices) #样本的读取顺序是随机的
  6. for i in range(0, num_examples, batch_size):
  7. indexs = torch.LongTensor(indices[i: min(i + batch_size, num_examples)])
  8. yield features.index_select(0, indexs), labels.index_select(0, indexs)
  9. # 测试数据管道效果
  10. batch_size = 8
  11. (features,labels) = next(data_iter(X,Y,batch_size))
  12. print(features)
  13. print(labels)

image.png

定义模型

  1. # 定义模型
  2. class LinearRegression:
  3. def __init__(self):
  4. self.w = torch.randn_like(w0,requires_grad=True)
  5. self.b = torch.zeros_like(b0,requires_grad=True)
  6. #正向传播
  7. def forward(self,x):
  8. return x@self.w + self.b
  9. # 损失函数
  10. def loss_func(self,y_pred,y_true):
  11. return torch.mean((y_pred - y_true)**2/2)
  12. model = LinearRegression()

训练模型

  1. def train_step(model, features, labels):
  2. predictions = model.forward(features)
  3. loss = model.loss_func(predictions,labels)
  4. # 反向传播求梯度
  5. loss.backward()
  6. # 使用torch.no_grad()避免梯度记录,也可以通过操作 model.w.data 实现避免梯度记录
  7. with torch.no_grad():
  8. # 梯度下降法更新参数
  9. model.w -= 0.001*model.w.grad
  10. model.b -= 0.001*model.b.grad
  11. # 梯度清零
  12. model.w.grad.zero_()
  13. model.b.grad.zero_()
  14. return loss
  1. # 测试train_step效果
  2. batch_size = 10
  3. (features,labels) = next(data_iter(X,Y,batch_size))
  4. train_step(model,features,labels)

image.png

  1. def train_model(model,epochs):
  2. for epoch in range(1,epochs+1):
  3. for features, labels in data_iter(X,Y,10):
  4. loss = train_step(model,features,labels)
  5. if epoch%200==0:
  6. printbar()
  7. print("epoch =",epoch,"loss = ",loss.item())
  8. print("model.w =",model.w.data)
  9. print("model.b =",model.b.data)
  10. train_model(model,epochs = 1000)

image.png

  1. # 结果可视化
  2. %matplotlib inline
  3. %config InlineBackend.figure_format = 'svg'
  4. plt.figure(figsize = (12,5))
  5. ax1 = plt.subplot(121)
  6. ax1.scatter(X[:,0].numpy(),Y[:,0].numpy(), c = "b",label = "samples")
  7. ax1.plot(X[:,0].numpy(),(model.w[0].data*X[:,0]+model.b[0].data).numpy(),"-r",linewidth = 5.0,label = "model")
  8. ax1.legend()
  9. plt.xlabel("x1")
  10. plt.ylabel("y",rotation = 0)
  11. ax2 = plt.subplot(122)
  12. ax2.scatter(X[:,1].numpy(),Y[:,0].numpy(), c = "g",label = "samples")
  13. ax2.plot(X[:,1].numpy(),(model.w[1].data*X[:,1]+model.b[0].data).numpy(),"-r",linewidth = 5.0,label = "model")
  14. ax2.legend()
  15. plt.xlabel("x2")
  16. plt.ylabel("y",rotation = 0)
  17. plt.show()

image.png


DNN二分类模型

准备数据

  1. import numpy as np
  2. import pandas as pd
  3. from matplotlib import pyplot as plt
  4. import torch
  5. from torch import nn
  6. %matplotlib inline
  7. %config InlineBackend.figure_format = 'svg'
  8. #正负样本数量
  9. n_positive,n_negative = 2000,2000
  10. #生成正样本, 小圆环分布
  11. r_p = 5.0 + torch.normal(0.0,1.0,size = [n_positive,1])
  12. theta_p = 2*np.pi*torch.rand([n_positive,1])
  13. Xp = torch.cat([r_p*torch.cos(theta_p),r_p*torch.sin(theta_p)],axis = 1)
  14. Yp = torch.ones_like(r_p)
  15. #生成负样本, 大圆环分布
  16. r_n = 8.0 + torch.normal(0.0,1.0,size = [n_negative,1])
  17. theta_n = 2*np.pi*torch.rand([n_negative,1])
  18. Xn = torch.cat([r_n*torch.cos(theta_n),r_n*torch.sin(theta_n)],axis = 1)
  19. Yn = torch.zeros_like(r_n)
  20. #汇总样本
  21. X = torch.cat([Xp,Xn],axis = 0)
  22. Y = torch.cat([Yp,Yn],axis = 0)
  23. #可视化
  24. plt.figure(figsize = (6,6))
  25. plt.scatter(Xp[:,0].numpy(),Xp[:,1].numpy(),c = "r")
  26. plt.scatter(Xn[:,0].numpy(),Xn[:,1].numpy(),c = "g")
  27. plt.legend(["positive","negative"]);

image.png

  1. # 构建数据管道迭代器
  2. def data_iter(features, labels, batch_size=8):
  3. num_examples = len(features)
  4. indices = list(range(num_examples))
  5. np.random.shuffle(indices) #样本的读取顺序是随机的
  6. for i in range(0, num_examples, batch_size):
  7. indexs = torch.LongTensor(indices[i: min(i + batch_size, num_examples)])
  8. yield features.index_select(0, indexs), labels.index_select(0, indexs)
  9. # 测试数据管道效果
  10. batch_size = 8
  11. (features,labels) = next(data_iter(X,Y,batch_size))
  12. print(features)
  13. print(labels)

image.png

定义模型

此处范例我们利用nn.Module来组织模型变量。

  1. class DNNModel(nn.Module):
  2. def __init__(self):
  3. super(DNNModel, self).__init__()
  4. self.w1 = nn.Parameter(torch.randn(2,4))
  5. self.b1 = nn.Parameter(torch.zeros(1,4))
  6. self.w2 = nn.Parameter(torch.randn(4,8))
  7. self.b2 = nn.Parameter(torch.zeros(1,8))
  8. self.w3 = nn.Parameter(torch.randn(8,1))
  9. self.b3 = nn.Parameter(torch.zeros(1,1))
  10. # 正向传播
  11. def forward(self,x):
  12. x = torch.relu(x@self.w1 + self.b1)
  13. x = torch.relu(x@self.w2 + self.b2)
  14. y = torch.sigmoid(x@self.w3 + self.b3)
  15. return y
  16. # 损失函数(二元交叉熵)
  17. def loss_func(self,y_pred,y_true):
  18. #将预测值限制在1e-7以上, 1- (1e-7)以下,避免log(0)错误
  19. eps = 1e-7
  20. y_pred = torch.clamp(y_pred,eps,1.0-eps)
  21. bce = - y_true*torch.log(y_pred) - (1-y_true)*torch.log(1-y_pred)
  22. return torch.mean(bce)
  23. # 评估指标(准确率)
  24. def metric_func(self,y_pred,y_true):
  25. y_pred = torch.where(y_pred>0.5,torch.ones_like(y_pred,dtype = torch.float32),torch.zeros_like(y_pred,dtype = torch.float32))
  26. acc = torch.mean(1-torch.abs(y_true-y_pred))
  27. return acc
  28. model = DNNModel()
  1. # 测试模型结构
  2. batch_size = 10
  3. (features,labels) = next(data_iter(X,Y,batch_size))
  4. predictions = model(features)
  5. loss = model.loss_func(labels,predictions)
  6. metric = model.metric_func(labels,predictions)
  7. print("init loss:", loss.item())
  8. print("init metric:", metric.item())

image.png

  1. len(list(model.parameters()))

image.png

训练模型

  1. def train_step(model, features, labels):
  2. # 正向传播求损失
  3. predictions = model.forward(features)
  4. loss = model.loss_func(predictions,labels)
  5. metric = model.metric_func(predictions,labels)
  6. # 反向传播求梯度
  7. loss.backward()
  8. # 梯度下降法更新参数
  9. for param in model.parameters():
  10. #注意是对param.data进行重新赋值,避免此处操作引起梯度记录
  11. param.data = (param.data - 0.01*param.grad.data)
  12. # 梯度清零
  13. model.zero_grad()
  14. return loss.item(),metric.item()
  15. def train_model(model,epochs):
  16. for epoch in range(1,epochs+1):
  17. loss_list,metric_list = [],[]
  18. for features, labels in data_iter(X,Y,20):
  19. lossi,metrici = train_step(model,features,labels)
  20. loss_list.append(lossi)
  21. metric_list.append(metrici)
  22. loss = np.mean(loss_list)
  23. metric = np.mean(metric_list)
  24. if epoch%100==0:
  25. printbar()
  26. print("epoch =",epoch,"loss = ",loss,"metric = ",metric)
  27. train_model(model,epochs = 1000)

image.png

  1. # 结果可视化
  2. fig, (ax1,ax2) = plt.subplots(nrows=1,ncols=2,figsize = (12,5))
  3. ax1.scatter(Xp[:,0],Xp[:,1], c="r")
  4. ax1.scatter(Xn[:,0],Xn[:,1],c = "g")
  5. ax1.legend(["positive","negative"]);
  6. ax1.set_title("y_true");
  7. Xp_pred = X[torch.squeeze(model.forward(X)>=0.5)]
  8. Xn_pred = X[torch.squeeze(model.forward(X)<0.5)]
  9. ax2.scatter(Xp_pred[:,0],Xp_pred[:,1],c = "r")
  10. ax2.scatter(Xn_pred[:,0],Xn_pred[:,1],c = "g")
  11. ax2.legend(["positive","negative"]);
  12. ax2.set_title("y_pred");

image.png