线性回归模型

1, 准备数据

  1. import numpy as np
  2. import pandas as pd
  3. from matplotlib import pyplot as plt
  4. import tensorflow as tf
  5. # 样本数量
  6. n = 400
  7. # 生成测试用的数据集
  8. X = tf.random.uniform([n,2], minval=-10, maxval=10)
  9. w0 = tf.constant([[2.0],[-3.0]])
  10. b0 = tf.constant([[3.0]])
  11. Y = X@w0 + b0 + tf.random.normal([n,1], mean=0.0, stddev=2.0) # @表示矩阵乘法
  1. # 数据可视化
  2. %matplotlib inline
  3. %config InlineBackend.figure_format = 'svg'
  4. plt.figure(figsize = (12,5))
  5. ax1 = plt.subplot(121)
  6. ax1.scatter(X[:,0],Y[:,0], c = "b")
  7. plt.xlabel("x1")
  8. plt.ylabel("y",rotation = 0)
  9. ax2 = plt.subplot(122)
  10. ax2.scatter(X[:,1],Y[:,0], c = "g")
  11. plt.xlabel("x2")
  12. plt.ylabel("y",rotation = 0)
  13. plt.show()
  1. # 构建数据管道迭代器
  2. def data_iter(features, labels, batch_size=8):
  3. num_examples = len(features)
  4. indices = list(range(num_examples))
  5. np.random.shuffle(indices)
  6. for i in range(0, num_examples, batch_size):
  7. indexs = indices[i: min(i+batch_size, num_examples)]
  8. yield tf.gather(features, indexs), tf.gather(labels, indexs)
  9. # 测试数据管道效果
  10. batch_size = 8
  11. (feature, labels) = next(data_iter(X,Y,batch_size))
  12. print(features)
  13. print(labels)

2, 定义模型

  1. w = tf.Variable(tf.random.normal(w0.shape))
  2. b = tf.Variable(tf.zero_like(b0, dtype=tf.float32))
  3. # 定义模型
  4. class LinearRegression:
  5. # 正向传播
  6. def __call__(self,x):
  7. return x@w + b
  8. # 损失函数
  9. def loss_func(self, y_true, y_pred):
  10. return tf.reduce_mean((y_ture - y_pred)**2/2)
  11. model = LinearRegression()

3, 训练模型

  1. # 使用动态图调试
  2. def train_step(model, features, labels):
  3. with tf.GradientTape() as tape:
  4. predictions = model(features)
  5. loss = model.loss_func(labels, predictions)
  6. # 方向传播求梯度
  7. w.assign(w - 0.001*dloss_dw)
  8. b.assign(b - 0.001*dloss_db)
  9. return loss
  1. # 测试train_step效果
  2. batch_size = 10
  3. (features,labels) = next(data_iter(X,Y,batch_size))
  4. train_step(model,features,labels)
  1. def train_model(model,epochs):
  2. for epoch in tf.range(1,epochs+1):
  3. for features, labels in data_iter(X,Y,10):
  4. loss = train_step(model,features,labels)
  5. if epoch%50==0:
  6. printbar()
  7. tf.print("epoch =",epoch,"loss = ",loss)
  8. tf.print("w =",w)
  9. tf.print("b =",b)
  10. train_model(model,epochs = 200)

使用autograph机制转换成静态图加速

  1. @tf.function
  2. def train_step(model, features, labels):
  3. with tf.GradientTape() as tape:
  4. predictions = model(features)
  5. loss = model.loss_func(labels, predictions)
  6. # 反向传播求梯度
  7. dloss_dw, dloss_db = tape.gradient(loss, [w,b])
  8. # 梯度下降法更新参数
  9. w.assign(w - 0.001*dloss_dw)
  10. b.assign(b - 0.001*dloss_db)
  11. return loss
  12. def train_model(model, epochs):
  13. for epoch in tf.range(1,epochs+1):
  14. for features, labels in data_iter(X,Y,10):
  15. loss = train_step(model, features, labels)
  16. if epoch%50==0:
  17. printbar()
  18. tf.print("epoch =",epoch,"loss = ",loss)
  19. tf.print("w =",w)
  20. tf.print("b =",b)
  21. train_model(model,epochs = 200)

DNN二分类模型

1, 准备数据

import numpy as np 
import pandas as pd 
from matplotlib import pyplot as plt
import tensorflow as tf
%matplotlib inline
%config InlineBackend.figure_format = 'svg'

#正负样本数量
n_positive,n_negative = 2000,2000

#生成正样本, 小圆环分布
r_p = 5.0 + tf.random.truncated_normal([n_positive,1],0.0,1.0)
theta_p = tf.random.uniform([n_positive,1],0.0,2*np.pi) 
Xp = tf.concat([r_p*tf.cos(theta_p),r_p*tf.sin(theta_p)],axis = 1)
Yp = tf.ones_like(r_p)

#生成负样本, 大圆环分布
r_n = 8.0 + tf.random.truncated_normal([n_negative,1],0.0,1.0)
theta_n = tf.random.uniform([n_negative,1],0.0,2*np.pi) 
Xn = tf.concat([r_n*tf.cos(theta_n),r_n*tf.sin(theta_n)],axis = 1)
Yn = tf.zeros_like(r_n)

#汇总样本
X = tf.concat([Xp,Xn],axis = 0)
Y = tf.concat([Yp,Yn],axis = 0)


#可视化
plt.figure(figsize = (6,6))
plt.scatter(Xp[:,0].numpy(),Xp[:,1].numpy(),c = "r")
plt.scatter(Xn[:,0].numpy(),Xn[:,1].numpy(),c = "g")
plt.legend(["positive","negative"]);

构建数据管道迭代器

def data_iter(features, labels, batch_size=8):
    num_examples = len(features)
    indices = list(range(num_exampels))
    np.random.shuffle(indices)
    for i in range(0, num_examples, batch_size):
        indexs = indices[i: min(i + batch_size, num_examples)]
        yield tf.gather(features, indexs), tf.gather(labels,indexs)

2, 定义模型

此处范例我们利用tf.Module来组织模型变量

class DNNModel(tf.Module):
    def __init__(self,name = None):
        super(DNNModel, self).__init__(name=name)
        self.w1 = tf.Variable(tf.random.truncated_normal([2,4]),dtype = tf.float32)
        self.b1 = tf.Variable(tf.zeros([1,4]),dtype = tf.float32)
        self.w2 = tf.Variable(tf.random.truncated_normal([4,8]),dtype = tf.float32)
        self.b2 = tf.Variable(tf.zeros([1,8]),dtype = tf.float32)
        self.w3 = tf.Variable(tf.random.truncated_normal([8,1]),dtype = tf.float32)
        self.b3 = tf.Variable(tf.zeros([1,1]),dtype = tf.float32)


    # 正向传播
    @tf.function(input_signature=[tf.TensorSpec(shape = [None,2], dtype = tf.float32)])  
    def __call__(self,x):
        x = tf.nn.relu(x@self.w1 + self.b1)
        x = tf.nn.relu(x@self.w2 + self.b2)
        y = tf.nn.sigmoid(x@self.w3 + self.b3)
        return y

    # 损失函数(二元交叉熵)
    @tf.function(input_signature=[tf.TensorSpec(shape = [None,1], dtype = tf.float32),
                              tf.TensorSpec(shape = [None,1], dtype = tf.float32)])  
    def loss_func(self,y_true,y_pred):  
        #将预测值限制在 1e-7 以上, 1 - 1e-7 以下,避免log(0)错误
        eps = 1e-7
        y_pred = tf.clip_by_value(y_pred,eps,1.0-eps)
        bce = - y_true*tf.math.log(y_pred) - (1-y_true)*tf.math.log(1-y_pred)
        return  tf.reduce_mean(bce)

    # 评估指标(准确率)
    @tf.function(input_signature=[tf.TensorSpec(shape = [None,1], dtype = tf.float32),
                              tf.TensorSpec(shape = [None,1], dtype = tf.float32)]) 
    def metric_func(self,y_true,y_pred):
        y_pred = tf.where(y_pred>0.5,tf.ones_like(y_pred,dtype = tf.float32),
                          tf.zeros_like(y_pred,dtype = tf.float32))
        acc = tf.reduce_mean(1-tf.abs(y_true-y_pred))
        return acc

model = DNNModel()

3, 训练模型