机器学习在这几年大火,几乎所有领域的研究都有用到机器学习的方法,AI+X的研究方法在各领域已经逐渐展开。我心血来潮,想用Python实现部分机器学习的模型,在实现过程中锻炼解决问题的能力以及Python编程能力,并且掌握一套“写代码+上网找资料”的方法论。
线性回归
线性回归的代码如下:
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
sns.set()
if __name__ == "__main__":
x = np.linspace(10,50,41)
y = np.random.random() * x + np.random.random() + 2.8 * np.random.randn(1,41)
H1 = np.array([x.T])
H2 = np.array([np.ones(41)])
H = np.concatenate((H1,H2),axis = 0)
H = H.T
#print(H)
p = np.dot(np.dot(np.linalg.inv(np.dot(H.T,H)),H.T),y.T)
print(p)
ye = p[0][0] * x + p[1][0]
plt.scatter(x,y)
plt.plot(x,ye,color="red")
plt.show()
支持向量机与核函数
支持向量机与核函数(直接用的sklearn)代码如下:
# -*- coding: utf-8 -*-
import scipy.misc, scipy.io, scipy.optimize
from sklearn import svm
from sklearn import model_selection
import numpy as np
import matplotlib.pyplot as plt
plt.rcParams['font.sans-serif'] = ['SimHei']
plt.rcParams['axes.unicode_minus'] = False
def plot(data):
positives = data[data[:, 2] == 1]
negatives = data[data[:, 2] == 0]
plt.plot(positives[:, 0], positives[:, 1], 'b+')
plt.plot(negatives[:, 0], negatives[:, 1], 'yo')
# 绘制SVM决策边界
def visualize_boundary(X, trained_svm):
kernel = trained_svm.get_params()['kernel']
if kernel == 'linear':
w = trained_svm.coef_[0]
i = trained_svm.intercept_
xp = np.linspace(min(X[:, 0]), max(X[:, 0]), 100)
a = -w[0] / w[1]
b = i[0] / w[1]
yp = a * xp - b
plt.plot(xp, yp, 'b-')
elif kernel == 'rbf':
x1plot = np.linspace(min(X[:, 0]), max(X[:, 0]), 100)
x2plot = np.linspace(min(X[:, 1]), max(X[:, 1]), 100)
X1, X2 = np.meshgrid(x1plot, x2plot)
vals = np.zeros(np.shape(X1))
for i in range(0, np.shape(X1)[1]):
this_X = np.c_[X1[:, i], X2[:, i]]
vals[:, i] = trained_svm.predict(this_X)
plt.contour(X1, X2, vals, colors='blue')
#高斯核函数
def gaussian_kernel(x1, x2, sigma):
# your code here
temp = x1 - x2
temp = temp * temp
value = np.sum(temp)
return np.exp(-value/(2*sigma*sigma))
def dataset3_params_ver3(X, y, X_val, y_val):
np.c_values = [0.01, 0.03, 0.1, 0.3, 1, 3, 10, 30]
sigma_values = [0.01, 0.03, 0.1, 0.3, 1, 3, 10, 30]
gammas = map(lambda x: 1.0 / x, sigma_values)
raveled_y = y.ravel()
rbf_svm = svm.SVC()
parameters = {'kernel': ('rbf',), 'C': np.c_values, 'gamma': list(gammas)}
grid = model_selection.GridSearchCV(rbf_svm, parameters)
best = grid.fit(X, raveled_y).best_params_
return best
def dataset2_params_ver2(X, y, X_val, y_val):
np.c_values = [0.01, 0.03, 0.1, 0.3, 1, 3, 10, 30]
sigma_values = [0.01, 0.03, 0.1, 0.3, 1, 3, 10, 30]
raveled_y = y.ravel() # Else the SVM will give you annoying warning
m_val = np.shape(X_val)[0] # number of entries in validation data
rbf_svm = svm.SVC(kernel='rbf')
best = {'score': -999, 'C': 0.0, 'sigma': 0.0}
for C in np.c_values:
for sigma in sigma_values:
# train the SVM first
rbf_svm.set_params(C=C)
rbf_svm.set_params(gamma=1.0 / sigma)
rbf_svm.fit(X, raveled_y)
score = rbf_svm.score(X_val, y_val)
# get the lowest error
if score > best['score']:
best['score'] = score
best['C'] = C
best['sigma'] = sigma
best['gamma'] = 1.0 / best['sigma']
return best
def params_search(X, y, X_val, y_val):
np.c_values = [0.01, 0.03, 0.1, 0.3, 1, 3, 10, 30]
sigma_values = [0.01, 0.03, 0.1, 0.3, 1, 3, 10, 30]
raveled_y = y.ravel()
m_val = np.shape(X_val)[0]
rbf_svm = svm.SVC(kernel='rbf')
best = {'error': 999, 'C': 0.0, 'sigma': 0.0}
for C in np.c_values:
for sigma in sigma_values:
# train the SVM first
rbf_svm.set_params(C=C)
rbf_svm.set_params(gamma=1.0 / sigma)
rbf_svm.fit(X, raveled_y)
# test it out on validation data
predictions = []
for i in range(0, m_val):
prediction_result = rbf_svm.predict(X_val[i].reshape(-1, 2))
predictions.append(prediction_result[0])
# sadly if you don't reshape it, numpy doesn't know if it's row or column vector
predictions = np.array(predictions).reshape(m_val, 1)
error = (predictions != y_val.reshape(m_val, 1)).mean()
# get the lowest error
if error < best['error']:
best['error'] = error
best['C'] = C
best['sigma'] = sigma
best['gamma'] = 1.0 / best['sigma']
return best
# 线性可分SVM
def part1():
# --------------- 步骤1 ------------------
# 加载数据集1
mat = scipy.io.loadmat("dataset_1.mat")
X, y = mat['X'], mat['y']
# 绘制数据集1
plt.title('数据集1分布')
plot(np.c_[X, y])
plt.show(block=True)
# --------------- 步骤2 ------------------
# 训练线性SVM(C = 1)
linear_svm = svm.SVC(C=1, kernel='linear')
linear_svm.fit(X, y.ravel())
# 绘制C=1的SVM决策边界
plt.title('C=1的SVM决策边界')
plot(np.c_[X, y])
visualize_boundary(X, linear_svm)
plt.show(block=True)
# --------------- 步骤3 ------------------
# 训练线性SVM(C = 100)
# your code here
linear_svm2 = svm.SVC(C=30, kernel='linear')
linear_svm2.fit(X, y.ravel())
# 绘制C=100的SVM决策边界
# your code here
plt.title('C=30的SVM决策边界')
plot(np.c_[X, y])
visualize_boundary(X, linear_svm2)
plt.show(block=True)
# 非线性可分SVM
def part2():
# --------------- 步骤1 ------------------
# 计算高斯核函数
x1 = np.array([1, 2, 1])
x2 = np.array([0, 4, -1])
sigma = 2
print("样本x1和x2之间的相似度: %f" % gaussian_kernel(x1, x2, sigma))
# --------------- 步骤2 ------------------
# 加载数据集2
mat = scipy.io.loadmat("dataset_2.mat")
X, y = mat['X'], mat['y']
# 绘制数据集2
plt.title('数据集2分布')
plot(np.c_[X, y])
plt.show(block=True)
# 训练高斯核函数SVM
sigma = 0.1
rbf_svm = svm.SVC(C=1, kernel='rbf', gamma=1.0 / sigma) # gamma is actually inverse of sigma
rbf_svm.fit(X, y.ravel())
# 绘制非线性SVM的决策边界
# your code here
plt.title('C=1的高斯核函数SVM决策边界')
plot(np.c_[X, y])
visualize_boundary(X, rbf_svm)
plt.show(block=True)
# 参数搜索
def part3():
# --------------- 步骤1 ------------------
# 加载数据集3和验证集
mat = scipy.io.loadmat("dataset_3.mat")
X, y = mat['X'], mat['y']
X_val, y_val = mat['Xval'], mat['yval']
# 绘制数据集3
plt.title('数据集3分布')
plot(np.c_[X, y])
plt.show(block=True)
# 绘制验证集
plt.title('验证集分布')
plot(np.c_[X_val, y_val])
plt.show(block=True)
# 训练高斯核函数SVM并搜索使用最优模型参数
rbf_svm = svm.SVC(kernel='rbf')
# your code here
raveled_y = y.ravel()
rbf_svm.fit(X, raveled_y)
best1 = params_search(X,y,X_val,y_val)
rbf_svm.set_params(**{'C':best1['C'],'gamma':best1['gamma']})
# 绘制决策边界
plt.title('参数搜索后的决策边界')
plot(np.c_[X, y])
visualize_boundary(X, rbf_svm)
plt.show(block=True)
# best = dataset2_params_ver2(X, y, X_val, y_val)
# rbf_svm.set_params(C=best['C'])
# rbf_svm.set_params(gamma=best['gamma'])
# plot(np.c_[X, y])
# visualize_boundary(X, rbf_svm)
# plt.show(block=True)
# best = dataset3_params_ver3(X, y, X_val, y_val)
# rbf_svm.set_params(C=best['C'])
# rbf_svm.set_params(gamma=best['gamma'])
# plot(np.c_[X, y])
# visualize_boundary(X, rbf_svm)
# plt.show(block=True)
def main():
np.set_printoptions(precision=6, linewidth=200)
part1()
part2()
part3()
if __name__ == '__main__':
main()
前馈神经网络
多层感知器的Baseline代码如下:
"""
下面的代码只是神经网络的一个Baseline
还可以继续深入实现其他功能,例如:
1、自适应学习率(已实现)
2、Adam RMSprop等训练方式
3、Drop(神经元失活)来避免过拟合问题
4、Mini-batch的训练方式等等
"""
import numpy as np
from sklearn import datasets
import matplotlib.pyplot as plt
import pandas as pd
from sklearn.preprocessing import StandardScaler # 标准化工具
import seaborn as sns
sns.set()
k = 0 #自适应学习率
#激活函数,添加非线性成分
def sigmoid(x,method):
if method == 1:
return 1/(1+np.exp(-x))
else:
return np.array([1/(1 + np.exp(-i)) for i in x])
#激活函数的导数
def deriv(x):
fx = sigmoid(x,1)
return fx*(1-fx)
#均方差损失函数
def mse_loss(y_tr, y_pr):
return ((y_tr - y_pr) ** 2).mean()
#交叉熵损失函数
def log_loss(y_tr, y_pr):
return (- np.multiply(y_tr,np.log(y_pr))
- np.multiply(1-y_tr,np.log(1-y_pr))).mean()
class NN():
def __init__(self,input_num,hidden_num,output_num):
#神经网络初始化,其中相关矩阵的大小要想清楚
self.input_num = input_num + 1
self.hidden_num = hidden_num
self.output_num = output_num
self.w1 = np.random.random(size = (self.hidden_num,self.input_num)) - 0.5
self.w2 = np.random.random(size = (self.output_num,self.hidden_num + 1)) - 0.5
self.alpha = 0.01
#神经网络的前向传播
def fforward(self,input):
inputs = np.concatenate((input,np.array([1])),axis = 0)
inputs.resize(self.input_num,1)
a = np.dot(self.w1,inputs)
a = (sigmoid(a.T,1)).T
a.resize(1,self.hidden_num)
a = np.concatenate((a[0],np.array([1])),axis = 0)
y = np.dot(self.w2,a)
y = (sigmoid(y.T,1)).T
return [y,a,inputs]
#反向传播训练神经网络
def bforward(self,input,label):
[y,a,i] = self.fforward(input)
part1 = 2*(y-label)
a.resize(self.hidden_num+1,1)
part2 = deriv(np.dot(self.w2,a))
part3 = np.dot(part2,a.T)
w2n = np.dot(part1,part3)
part4 = self.w2.T[0:-1]
w1n = part1 * part2 * part4 * i.T
#反向传播 随机梯度下降
self.w1 -= self.alpha * w1n
self.w2 -= self.alpha * w2n
#self.w1 -= self.alpha * k * w1n
#self.w2 -= self.alpha * k * w2n
return ((y-label)**2)[0]
if __name__ == '__main__':
cancer =datasets.load_breast_cancer() #导入数据集
cancer_X,cancer_y=cancer.data,cancer.target #分割特征和标签
#print(cancer_X.shape)
#print(cancer_y.shape)
scaler = StandardScaler()
cancer_X = scaler.fit_transform(cancer_X) #数据标准化
nn = NN(cancer_X.shape[1],10,1) #创作神经网络,隐藏层有十个神经元
#观察在训练之前网络预测的准确率
num = 0
for i in range(cancer_X.shape[0]):
predict = nn.fforward(cancer_X[i])
#根据sigmoid函数的性质,大于0.5则x大于0
if predict[0][0] > 0.5:
predict[0][0] = 1
else:
predict[0][0] = 0
if predict[0][0] == cancer_y[i]:
num += 1
print(num/cancer_X.shape[0])
#反向传播训练神经网络
loss = []
for i in range(cancer_X.shape[0]):
#k = 1 + 3 * 1/(1 + np.exp(i)) #自适应学习率
temp = nn.bforward(cancer_X[i],cancer_y[i])
if i % 30 == 0:
loss.append(temp)
#绘制训练过程中损失的变化曲线
loss = np.array(loss)
plt.title("the change of loss function")
plt.xlabel("training rounds")
plt.ylabel("the value of loss function")
plt.plot(loss)
plt.show()
#观察在训练之后网络预测的准确率
num = 0
for i in range(cancer_X.shape[0]):
#print(cancer_X[i])
predict = nn.fforward(cancer_X[i])
#print(predict[0][0])
if predict[0][0] > 0.5:
predict[0][0] = 1
else:
predict[0][0] = 0
if predict[0][0] == cancer_y[i]:
num += 1
print(num/cancer_X.shape[0])
卷积神经网络
自己写的版本
我当时写的这个是针对图片输入大小是固定的,所以很不灵活,代码如下:
import pandas as pd
import numpy as np
INPUTS_LEN = 4
#激活函数,添加非线性成分
def sigmoid(x,method):
if method == 1:
return 1/(1+np.exp(-x))
else:
return [1/(1 + np.exp(-i)) for i in x]
#激活函数的导数
def deriv(x):
fx = sigmoid(x,1)
return fx*(1-fx)
#均方差损失函数
def mse_loss(y_tr, y_pr):
return ((y_tr - y_pr) ** 2).mean()
#交叉熵损失函数
def log_loss(y_tr, y_pr):
return (- np.multiply(y_tr,np.log(y_pr))
- np.multiply(1-y_tr,np.log(1-y_pr))).mean()
#神经网络类
class neural_network():
def __init__(self):
self.w1 = np.random.normal(size=INPUTS_LEN*2).reshape(2,INPUTS_LEN)
self.b1 = np.random.normal(size=2)
self.w2 = np.random.normal(size=4).reshape(2,2)
self.b2 = np.random.normal(size=2)
self.w3 = np.random.normal(size=2)
self.b3 = np.random.normal()
def feedforward(self,inputs):
[hi11,hi12] = np.dot(inputs,self.w1.T) + self.b1
[ho11,ho12] = sigmoid([hi11,hi12],2)
[hi21,hi22] = np.dot([ho11,ho12],self.w2.T) + self.b2
[ho21,ho22] = sigmoid([hi21,hi22],2)
sum_o = np.dot([ho21,ho22],self.w3.T) + self.b3
output = 100 * sigmoid(sum_o,1)
return output
def train(self,input,answer):
learm_rate = 0.9
y_tr = answer
[hi11,hi12] = np.dot(input,self.w1.T) + self.b1
[ho11,ho12] = sigmoid([hi11,hi12],2)
[hi1,ho1] = [[hi11,hi12],[ho11,ho12]]
[hi21,hi22] = np.dot(ho1,self.w2.T) + self.b2
[ho21,ho22] = sigmoid([hi21,hi22],2)
[hi2,ho2] = [[hi21,hi22],[ho21,ho22]]
sum_o = np.dot(ho2,self.w3.T) + self.b3
y_pr = sigmoid(sum_o,1)
print('y_pr为:',y_pr)
#d_L_d_o = -y_tr/y_pr + (1-y_tr)/(1-y_pr)
d_L_d_o = -2 * (y_tr - y_pr)
ratio1 = learm_rate*d_L_d_o*deriv(sum_o)
dwlist3 = [ratio1* i for i in ho2]
ratio11 = learm_rate*ratio1*self.w3[0]*deriv(hi21)
temp1 = [ratio11* i for i in ho1]
ratio2 = learm_rate*ratio1*self.w3[1]*deriv(hi22)
temp2 = [ratio2*i for i in ho1]
dwlist2 = [temp1,temp2]
ratio3 = learm_rate*(ratio11*self.w2[0][0]+ratio2*self.w2[1][0])*deriv(hi11)
temp3 = [ratio3*i for i in input]
ratio4 = learm_rate * (ratio11*self.w2[0][1]+ratio2*self.w2[1][1])*deriv(hi12)
temp4 = [ratio4* i for i in input]
dwlist1 = [temp3,temp4]
self.w3 -= dwlist3
self.w2 -= dwlist2
self.w1 -= dwlist1
self.b3 -= ratio1
self.b2[0] -= ratio11
self.b2[1] -= ratio2
self.b1[0] -= ratio3
self.b1[1] -= ratio4
d_l_d_input = []
temp = self.w1.copy()
temp2 = dwlist1.copy()
data = temp*temp2*10
for i in range(4):
d_l_d_input.append((data[0][i]+data[1][i])/(9*input[i]))
d_l_d_input = np.array(d_l_d_input)
return d_l_d_input
def show(self):
print('w1:',self.w1)
print('b1:',self.b1)
print('w2:',self.w2)
print('b2:',self.b2)
print('w3:',self.w3)
print('b3:',self.b3)
from PIL import Image
import numpy as np
from neural_network import *
#找矩阵中第二大的值
def find_second(np_matrix):
temp_list = []
for i in range(len(np_matrix)):
for j in range(len(np_matrix[0])):
temp_list.append(np_matrix[i][j])
temp_list = np.array(temp_list)
temp_list[np.argmax(temp_list)] = 0
max2_index = np.argmax(temp_list)
max_num = np.max(temp_list)
return (max_num,max2_index)
#卷积神经网络类
class convolnetwork:
def __init__(self,pixel):
self.pixel = []
self.conkernels = np.random.uniform(low = -1.0,high = 1.0,size =9).reshape(3,3)
self.convol_maps = []
self.poolings = []
self.pool_nums = []
self.softmax = neural_network()
def show(self):
print('卷积核如下:')
print(self.conkernels)
print('softmax层如下:')
self.softmax.show()
#数据处理成8*8的矩阵
def press(self):
pixel_matrixs = []
for i in range(3):
process = self.pixel[:,:,i]
pixel_matrix = []
for j in range(8):
list_now = []
for k in range(8):
choice = np.array(process[j*130:(j+1)*130,k*195:(k+1)*195])
list_now.append(choice.mean())
pixel_matrix.append(list_now)
pixel_matrixs.append(pixel_matrix)
temp = np.array(pixel_matrixs)
temp = temp / 255 * 2 - 1
temp[0] = (temp[0] + temp[1] + temp[2])/3
self.pixel = temp[0]
#卷积,卷积核与前面处理的数据卷积运算
def convolve(self):
temp = []
for j in range(6):
list_now = []
for k in range(6):
piece = np.array(self.pixel[j:j+3,k:k+3].copy())
mean_num = (piece*self.conkernels).mean()
list_now.append(mean_num)
self.convol_maps.append(list_now)
self.convol_maps = np.array(self.convol_maps)
#池化,各个区域取最大值
def pooling(self):
#self.last_input =
list_now = []
for i in range(2):
for j in range(2):
num = []
temp = np.array(self.convol_maps[i*3:(i+1)*3,j*3:(j+1)*3].copy())
max_index = np.argmax(self.convol_maps[i*3:(i+1)*3,j*3:(j+1)*3].copy())
column_index = max_index%3 + j*3 + 1
row_index = int((max_index - max_index%3)/3 + i*3) + 1
max2_num,max2_index = find_second(temp)
column2_index = max2_index%3 + j*3 + 1
row2_index = int((max2_index - max2_index%3)/3 + i*3) + 1
if row_index == -1:
row_index += 1
if row2_index == -1:
row2_index += 1
num.extend([row_index,column_index])
num.extend([row2_index,column2_index])
list_now.append((temp.max()+max2_num)/2.0)
self.pool_nums.append(num)
self.poolings = np.array(list_now)
#BP反向传播算法
def back_propagation(self,answer):
d_l_d_inputs = self.softmax.train(self.poolings,answer)
for i in range(4):
temp = self.pool_nums[i]
row = temp[0]
column = temp[1]
row2 = temp[2]
column2 = temp[3]
piece = self.pixel[row-1:row+2,column-1:column+2].copy()
piece2 = self.pixel[row2-1:row2+2,column2-1:column2+2].copy()
self.conkernels -= 9.0/2.0*(piece*d_l_d_inputs[i]+piece2*d_l_d_inputs[i])
#采用BP+SGD的方法来训练神经网络
def train(self,pixel,answer):
self.pixel = pixel
self.convol_maps = []
self.poolings = []
self.pool_nums = []
self.press()
self.convolve()
self.pooling()
self.back_propagation(answer)
img = Image.open('xjt.jpg')
pixels = np.array(img)
convol1 = convolnetwork(pixels)
convol1.show()
convol1.train(pixels,1)
convol1.show()
Tensorflow版本
CNN代码如下:
import tensorflow as tf
from tensorflow import keras
from keras.datasets import cifar10
#数据读取
(train_data,train_label),(test_data,test_label) = cifar10.load_data()
#数据预处理
x_data = train_data.astype('float32')/255
y_data = test_data.astype('float32')/255
#标签预处理
import numpy as np
def one_hot(label,num_classes):
label_one_hot = np.eye(num_classes)[label]
return label_one_hot
num_classes = 10
train_label = train_label.astype('int32')
train_label = np.squeeze(train_label)
x_label = one_hot(train_label,num_classes)
test_label = test_label.astype('int32')
y_label = np.squeeze(test_label)
print(train_label[0:5])
print(x_label[0:5])
#构建网络
from keras import Sequential
from keras.layers import Convolution2D,MaxPooling2D,Dense,Flatten,Dropout
cnn = Sequential()
#unit1
cnn.add(Convolution2D(32,kernel_size=[3,3],input_shape=(32,32,3),activation='relu',padding='same'))
cnn.add(Convolution2D(32,kernel_size=[3,3],activation='relu',padding='same'))
cnn.add(Convolution2D(32,kernel_size=[3,3],activation='relu',padding='same'))
cnn.add(MaxPooling2D(pool_size=[2,2],padding='same'))
cnn.add(Convolution2D(32,kernel_size=[3,3],activation='relu',padding='same'))
cnn.add(MaxPooling2D(pool_size=[2,2],padding='same'))
cnn.add(Dropout(0.5))
#unit2
cnn.add(Convolution2D(64,kernel_size=[3,3],activation='relu',padding='same'))
cnn.add(Convolution2D(64,kernel_size=[3,3],activation='relu',padding='same'))
cnn.add(Convolution2D(64,kernel_size=[3,3],activation='relu',padding='same'))
cnn.add(MaxPooling2D(pool_size=[2,2],padding='same'))
cnn.add(Dropout(0.5))
cnn.add(Flatten())
cnn.add(Dense(512,activation='relu'))
cnn.add(Dropout(0.5))
cnn.add(Dense(128,activation='relu'))
cnn.add(Dropout(0.5))
cnn.add(Dense(10,activation='relu'))
cnn.summary()
#编译模型
cnn.compile(optimizer=tf.keras.optimizers.Adam(lr=1e-3),loss='categorical_crossentropy',metrics=['acc'])
#训练模型
history_cnn = cnn.fit(x_data,x_label,epochs=20,batch_size=32,shuffle=True,verbose=1,validation_split=0.1)
#绘制损失和精度图
import matplotlib.pyplot as plt
plt.figure(1)
plt.plot(np.array(history_cnn.history['loss']))
plt.plot(np.array(history_cnn.history['val_loss']))
plt.xlabel('Epoch')
plt.ylabel('Train loss')
plt.legend(['loss','val_loss'])
plt.show()
plt.figure(2)
plt.plot(np.array(history_cnn.history['acc']))
plt.plot(np.array(history_cnn.history['val_acc']))
plt.xlabel('Epoch')
plt.ylabel('Train loss')
plt.legend(['acc','val_acc'])
plt.show()
#保存模型
cnn.save('model/cnn.h5')
#在新数据上生成预测结果
cnn = keras.models.load_model('model/cnn.h5')
test_out = cnn.predict(y_data)
#测试模型准确率
num = 0
total_num = y_data.shape[0]
for i in range(total_num):
predict = np.argmax(test_out[i])
if predict == y_label[i]:
num += 1
accuracy = num/total_num
print(accuracy)
Pytorch版本
CNN代码如下:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision
import torchvision.transforms as transforms
import torch.optim as optim
import time
import os
#数据预处理
transform = transforms.Compose(
[
transforms.RandomHorizontalFlip(),
transforms.RandomGrayscale(),
transforms.ToTensor(),
transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])
transform1 = transforms.Compose(
[
transforms.ToTensor(),
transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])
#导入数据集
trainset = torchvision.datasets.CIFAR10(root='./data', train=True,
download=True, transform=transform)
trainloader = torch.utils.data.DataLoader(trainset, batch_size=100,
shuffle=True, num_workers=2)
testset = torchvision.datasets.CIFAR10(root='./data', train=False,
download=True, transform=transform1)
testloader = torch.utils.data.DataLoader(testset, batch_size=50,
shuffle=False, num_workers=2)
classes = ('plane', 'car', 'bird', 'cat',
'deer', 'dog', 'frog', 'horse', 'ship', 'truck')
#定义卷积神经网络
class Net(nn.Module):
def __init__(self):
#定义卷积神经网络的网络结构
super(Net,self).__init__()
self.conv1 = nn.Conv2d(3,64,3,padding=1)
self.conv2 = nn.Conv2d(64,64,3,padding=1)
self.pool1 = nn.MaxPool2d(2, 2)
self.bn1 = nn.BatchNorm2d(64)
self.relu1 = nn.ReLU()
self.conv3 = nn.Conv2d(64,128,3,padding=1)
self.conv4 = nn.Conv2d(128, 128, 3,padding=1)
self.pool2 = nn.MaxPool2d(2, 2, padding=1)
self.bn2 = nn.BatchNorm2d(128)
self.relu2 = nn.ReLU()
self.conv5 = nn.Conv2d(128,128, 3,padding=1)
self.conv6 = nn.Conv2d(128, 128, 3,padding=1)
self.conv7 = nn.Conv2d(128, 128, 1,padding=1)
self.pool3 = nn.MaxPool2d(2, 2, padding=1)
self.bn3 = nn.BatchNorm2d(128)
self.relu3 = nn.ReLU()
self.conv8 = nn.Conv2d(128, 256, 3,padding=1)
self.conv9 = nn.Conv2d(256, 256, 3, padding=1)
self.conv10 = nn.Conv2d(256, 256, 1, padding=1)
self.pool4 = nn.MaxPool2d(2, 2, padding=1)
self.bn4 = nn.BatchNorm2d(256)
self.relu4 = nn.ReLU()
self.conv11 = nn.Conv2d(256, 512, 3, padding=1)
self.conv12 = nn.Conv2d(512, 512, 3, padding=1)
self.conv13 = nn.Conv2d(512, 512, 1, padding=1)
self.pool5 = nn.MaxPool2d(2, 2, padding=1)
self.bn5 = nn.BatchNorm2d(512)
self.relu5 = nn.ReLU()
self.fc14 = nn.Linear(512*4*4,1024)
self.drop1 = nn.Dropout2d()
self.fc15 = nn.Linear(1024,1024)
self.drop2 = nn.Dropout2d()
self.fc16 = nn.Linear(1024,10)
#前向传播,或者对应计算图中的前向模式
def forward(self,x):
x = self.conv1(x)
x = self.conv2(x)
x = self.pool1(x)
x = self.bn1(x)
x = self.relu1(x)
x = self.conv3(x)
x = self.conv4(x)
x = self.pool2(x)
x = self.bn2(x)
x = self.relu2(x)
x = self.conv5(x)
x = self.conv6(x)
x = self.conv7(x)
x = self.pool3(x)
x = self.bn3(x)
x = self.relu3(x)
x = self.conv8(x)
x = self.conv9(x)
x = self.conv10(x)
x = self.pool4(x)
x = self.bn4(x)
x = self.relu4(x)
x = self.conv11(x)
x = self.conv12(x)
x = self.conv13(x)
x = self.pool5(x)
x = self.bn5(x)
x = self.relu5(x)
# print(" x shape ",x.size())
x = x.view(-1,512*4*4)
x = F.relu(self.fc14(x))
x = self.drop1(x)
x = F.relu(self.fc15(x))
x = self.drop2(x)
x = self.fc16(x)
return x
#使用SGD算法来进行训练和梯度更新
def train_sgd(self,device):
#定义Adam优化器
optimizer = optim.Adam(self.parameters(), lr=0.0001)
path = 'weights.tar'
initepoch = 0
if os.path.exists(path) is not True:
#使用交叉熵损失函数
loss = nn.CrossEntropyLoss()
# optimizer = optim.SGD(self.parameters(),lr=0.01)
else:
checkpoint = torch.load(path)
self.load_state_dict(checkpoint['model_state_dict'])
optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
initepoch = checkpoint['epoch']
loss = checkpoint['loss']
#训练神经网络
for epoch in range(initepoch,100): # loop over the dataset multiple times
timestart = time.time()
running_loss = 0.0
total = 0
correct = 0
for i, data in enumerate(trainloader, 0):
# get the inputs
inputs, labels = data
inputs, labels = inputs.to(device),labels.to(device)
# zero the parameter gradients
optimizer.zero_grad()
# forward + backward + optimize
outputs = self(inputs)
l = loss(outputs, labels)
l.backward()
optimizer.step()
# print statistics
running_loss += l.item()
# print("i ",i)
if i % 500 == 499: # print every 500 mini-batches
print('[%d, %5d] loss: %.4f' %
(epoch, i, running_loss / 500))
running_loss = 0.0
_, predicted = torch.max(outputs.data, 1)
total += labels.size(0)
correct += (predicted == labels).sum().item()
print('Accuracy of the network on the %d tran images: %.3f %%' % (total,
100.0 * correct / total))
total = 0
correct = 0
torch.save({'epoch':epoch,
'model_state_dict':net.state_dict(),
'optimizer_state_dict':optimizer.state_dict(),
'loss':loss
},path)
print('epoch %d cost %3f sec' %(epoch,time.time()-timestart))
print('Finished Training')
#在测试集上测试,得到预测准确率
def test(self,device):
correct = 0
total = 0
with torch.no_grad():
for data in testloader:
images, labels = data
images, labels = images.to(device), labels.to(device)
outputs = self(images)
_, predicted = torch.max(outputs.data, 1)
total += labels.size(0)
correct += (predicted == labels).sum().item()
print('Accuracy of the network on the 10000 test images: %.3f %%' % (
100.0 * correct / total))
if __name__ == '__main__':
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
net = Net()
net = net.to(device)
net.train_sgd(device)
net.test(device)