1.引入需要的库

  • torch.optim 是一个实现了各种优化算法的库。
  • SciPy 是一个开源的 Python 算法库和数学工具包,基于 Numpy 的科学计算库
  • 很多数据集都是mat格式的标注信息,使用模块scipy.io的函数loadmat和savemat可以实现Python对mat数据的读写。
  • Pyplot 是 Matplotlib 的子库,提供了和 MATLAB 类似的绘图 API。Pyplot 是常用的绘图模块,能很方便让用户绘制 2D 图表。
  • scikit-learn是基于Python语言的机器学习库
  • PCA 用于对具有一组连续正交分量的多变量数据集进行方差最大化的分解。
  • 利用 scikit-learn 包中的 train_test_split 辅助函数可以很快地将实验数据集划分为任何训练集(training sets)和测试集(test sets)

    1. import torch
    2. import torch.optim as optim
    3. import torch.nn as nn
    4. import torch.nn.functional as F
    5. import numpy as np
    6. from scipy.io import loadmat, savemat
    7. import matplotlib.pyplot as plt
    8. from sklearn.decomposition import PCA
    9. from sklearn.model_selection import train_test_split
    10. import random

    2.设置训练的随机种子和其他环境参数

  • deterministic置为True的话,每次返回的卷积算法将是确定的,即默认算法;配合上设置 Torch 的随机种子为固定值的话,应该可以保证每次运行网络的时候相同输入的输出是固定的

  • 设置 torch.backends.cudnn.benchmark=True 将会让程序在开始时花费一点额外时间,为整个网络的每个卷积层搜索最适合它的卷积实现算法,进而实现网络的加速。适用场景是网络结构固定(不是动态变化的),网络的输入形状(包括 batch size,图片大小,输入的通道)是不变

    1. """
    2. Some tools function listed below:
    3. """
    4. def set_random_seed(mySeed=0):
    5. torch.manual_seed(mySeed) # 设定生成随机数的种子,并返回一个 torch._C.Generator 对象.
    6. torch.cuda.manual_seed(mySeed)
    7. torch.cuda.manual_seed_all(mySeed)
    8. torch.backends.cudnn.deterministic = True
    9. torch.backends.cudnn.benchmark = False
    10. np.random.seed(mySeed)
    11. random.seed(mySeed)

    3.定义图像变换方法

  • PCA 是一种常见的数据分析方式,常用于高维数据的降维,可用于提取数据的主要特征分量。

  • 用scikit-learn学习主成分分析(PCA)
  • 模型文件(.npy)由一个字典组成,字典中的每一个键对应一层网络模型参数。(包括权重w和偏置b) ```python def max_min_mean(img): “”” 计算图像的 max value ,min value ,mean value. “”” print(‘max: ‘,np.max(img),’min: ‘,np.min(img),’mean: ‘,np.mean(img))

def map_to_255(img): “”” 将图像映射到 [0,255] “”” return ( img - np.min(img) ) / ( np.max(img)-np.min(img) ) * 255

def applyPCA(X, numComponents): “”” 对图像应用PCA主成分分析法,从而降低图像维度 参数:numComponents:应用PCA降维后的特征维度数目 “”” newX = np.reshape(X, (-1, X.shape[2])) pca = PCA(n_components=numComponents, whiten=True) newX = pca.fit_transform(newX) newX = np.reshape(newX, (X.shape[0], X.shape[1], numComponents)) return newX

def addZeroPadding(X, margin=2): “”” 添加 0 元素填充padding “”” newX = np.zeros(( X.shape[0] + 2 margin, X.shape[1] + 2 margin, X.shape[2] )) newX[margin:X.shape[0]+margin, margin:X.shape[1]+margin, :] = X return newX

def createImgCube(X ,gt ,pos:list ,windowSize=25): “”” create Cube from pos list return imagecube gt nextPos “”” margin = (windowSize-1)//2 zeroPaddingX = addZeroPadding(X, margin=margin) dataPatches = np.zeros((pos.len(), windowSize, windowSize, X.shape[2])) if( pos[-1][1]+1 != X.shape[1] ): nextPos = (pos[-1][0] ,pos[-1][1]+1) elif( pos[-1][0]+1 != X.shape[0] ): nextPos = (pos[-1][0]+1 ,0) else: nextPos = (0,0) return np.array([zeroPaddingX[i:i+windowSize, j:j+windowSize, :] for i,j in pos ]),\ np.array([gt[i,j] for i,j in pos]) ,\ nextPos

def createPos(shape:tuple, pos:tuple, num:int): “”” creatre pos list after the given pos “”” if (pos[0]+1)(pos[1]+1)+num >shape[0]shape[1]: num = shape[0]shape[1]-( (pos[0])shape[1] + pos[1] ) return [(pos[0]+(pos[1]+i)//shape[1] , (pos[1]+i)%shape[1] ) for i in range(num) ]

def createPosWithoutZero(gt): “”” creatre pos list without zero labels “”” mask = gt>0 return [(i,j) for i , row in enumerate(mask) for j , row_element in enumerate(row) if row_element]

def splitTrainTestSet(X, gt, testRatio, randomState=456): “”” random split data set “”” X_train, X_test, gt_train, gt_test = train_test_split(X, gt, test_size=testRatio, random_state=randomState, stratify=gt) return X_train, X_test, gt_train, gt_test

def createImgPatch(lidar, pos:list, windowSize=25): “”” return lidar Img patches “”” margin = (windowSize-1)//2 zeroPaddingLidar = np.zeros(( lidar.shape[0] + 2 margin, lidar.shape[1] + 2 margin )) zeroPaddingLidar[margin:lidar.shape[0]+margin, margin:lidar.shape[1]+margin] = lidar return np.array([zeroPaddingLidar[i:i+windowSize, j:j+windowSize] for i,j in pos ])

  1. <a name="CQmAL"></a>
  2. ## 4.初始化数据集
  3. - windowSize:窗口大小
  4. - test_rate:验证集和训练集之比
  5. - ground_truth:数据标注的真实值
  6. ```python
  7. windowSize = 11
  8. test_rate = 0.9
  9. ##########################################################
  10. set_random_seed(mySeed=7) # mySeed = 4,95.5左右
  11. # 初始化数据集
  12. houston_gt = loadmat('./houston_gt.mat')['houston_gt'] # groundtruth
  13. houston_hsi = loadmat('./houston_hsi.mat')['houston_hsi']
  14. # shape = (349, 1905)
  15. houston_lidar = loadmat('./houston_lidar.mat')['houston_lidar']
  16. # 通过PCA方法,将高光谱图像降维,此时的通道数为 30
  17. # shape = (349, 1905, 30)
  18. houston_hsi = applyPCA(houston_hsi,30)
  19. # 训练数据集设置
  20. # 位置编码
  21. trainPos = np.load("./trainpos.npy").astype(np.int32).tolist()
  22. # len(trainPos) = 2832
  23. trainPos = [(array[1]-1,array[0]-1) for array in trainPos]
  24. # shape = torch.Size([2832, 1, 30, 11, 11])
  25. hsiTrain_tr, labels_tr, _ = createImgCube(houston_hsi, houston_gt, trainPos, windowSize=windowSize)
  26. hsiTrain_tr = torch.from_numpy(hsiTrain_tr.transpose((0,3,1,2))).unsqueeze(1).float()
  27. # shape = torch.Size([2832, 1, 11, 11])
  28. lidarTrain_tr = createImgPatch(houston_lidar, trainPos, windowSize=windowSize)
  29. lidarTrain_tr = torch.from_numpy(lidarTrain_tr).unsqueeze(1).float()
  30. # 测试数据集设置
  31. testPos = list(set(createPosWithoutZero(houston_gt))-set(trainPos))
  32. hsiTest_te, labels_te , _ = createImgCube(houston_hsi, houston_gt, testPos, windowSize=windowSize)
  33. hsiTest_te = torch.from_numpy(hsiTest_te.transpose((0,3,1,2))).unsqueeze(1).float()
  34. lidarTest_te = createImgPatch(houston_lidar, testPos, windowSize=windowSize)
  35. lidarTest_te = torch.from_numpy(lidarTest_te).unsqueeze(1).float()

5.定义数据集加载类

  • num_workers:用多少个子进程加载数据。0表示数据将在主进程中加载(默认: 0)
  • batch_size:批量大小设置为128 ```python

    创建 trainloader 和 testloader

    class TrainDS2832(torch.utils.data.Dataset): def _init(self):
    1. self.len = labels_tr.shape[0]
    2. self.hsi = torch.FloatTensor(hsiTrain_tr)
    3. self.lidar = torch.FloatTensor(lidarTrain_tr)
    4. self.labels = torch.LongTensor(labels_tr-1)
    def getitem(self, index):
      # 根据索引返回数据和对应的标签
      return self.hsi[index], self.lidar[index] , self.labels[index]
    
    def len(self):
      # 返回文件数据的数目
      return self.len
    

class TestDS2832(torch.utils.data.Dataset): def init(self): self.len = labelste.shape[0] self.hsi = torch.FloatTensor(hsiTest_te) self.lidar = torch.FloatTensor(lidarTest_te) self.labels = torch.LongTensor(labels_te-1)
def __getitem
(self, index):

    # 根据索引返回数据和对应的标签
    return self.hsi[index], self.lidar[index] , self.labels[index]
def __len__(self): 
    # 返回文件数据的数目
    return self.len

创建 trainloader 和 testloader

trainset2832 = TrainDS_2832() testset2832 = TestDS_2832()

train_loader = torch.utils.data.DataLoader(dataset=trainset2832, batch_size=128, shuffle=True, num_workers=2) test_loader = torch.utils.data.DataLoader(dataset=testset2832, batch_size=128, shuffle=False, num_workers=2) print(“Completed”)

<a name="YZS35"></a>
## 6.设置代码运行时
```python
# 使用GPU训练,可以在菜单 "代码执行工具" -> "更改运行时类型" 里进行设置
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
if torch.cuda.is_available():
  print("using cuda:0 as device")
else:
  print("using cpu as device")

7.定义并实现网络模型

1.svg

class LiSTNet(nn.Module):
  def __init__(self):
    super(LiSTNet, self).__init__()

    self.lidar_step1 = nn.Sequential(
      nn.Conv2d(in_channels=1,out_channels=64,kernel_size=3,padding=0)
      , nn.BatchNorm2d(num_features=64)
      , nn.ReLU(inplace=True)
    )
    self.lidar_step2 = nn.Sequential(
      nn.Conv2d(in_channels=64,out_channels=128,kernel_size=3,padding=0)
      , nn.BatchNorm2d(num_features=128)
      , nn.ReLU(inplace=True)
    )
    self.lidar_step3 = nn.Sequential(
      nn.Conv2d(in_channels=128,out_channels=256,kernel_size=3,padding=0)
      , nn.BatchNorm2d(num_features=256)
      , nn.ReLU(inplace=True)
    )

    self.hsi_step1 = nn.Sequential(
      nn.Conv3d(in_channels=1, out_channels=8, kernel_size=(9,3,3), padding=0)
      , nn.BatchNorm3d(num_features=8)
      , nn.ReLU(inplace=True)
    )
    self.hsi_step2 = nn.Sequential(
      nn.Conv3d(in_channels=8, out_channels=16, kernel_size=(7,3,3), padding=0)
      , nn.BatchNorm3d(num_features=16)
      , nn.ReLU(inplace=True)
    )
    self.hsi_step3 = nn.Sequential(
      nn.Conv3d(in_channels=16, out_channels=32, kernel_size=(5,3,3), padding=0)
      , nn.BatchNorm3d(num_features=32)
      , nn.ReLU(inplace=True)
    )
    self.hsi_step4 = nn.Sequential(
      nn.Conv2d(in_channels=384, out_channels=256, kernel_size=3, padding=1)
      , nn.BatchNorm2d(num_features=256)
      , nn.ReLU()
    )

    self.drop1 = nn.Dropout(0.6)
    self.drop2 = nn.Dropout(0.6)
    self.drop3 = nn.Dropout(0.6)

    self.fusionlinear_1 = nn.Linear(in_features= 1280,out_features= 15)
    self.fusionlinear_2 = nn.Linear(in_features= 1280,out_features= 15)
    self.fusionlinear_3 = nn.Linear(in_features= 2560, out_features= 15)
    self.weight = nn.Parameter(torch.ones(2))

  def forward(self,hsi,lidar):
    x1 = self.lidar_step1(lidar)
    x2 = self.lidar_step2(x1)
    x3 = self.lidar_step3(x2)
    x4 = x3.reshape(-1, x3.shape[1], x3.shape[2]*x3.shape[3])

    y1 = self.hsi_step1(hsi)
    y2 = self.hsi_step2(y1)
    y3 = self.hsi_step3(y2)
    y4 = y3.reshape(-1,32*12,y3.shape[3],y3.shape[4])
    y5 = self.hsi_step4(y4)
    y6 = y5.reshape(-1, y5.shape[1], y5.shape[2]*y5.shape[3])

    f1 = torch.cat((x4,y6),dim=1)   ## 直接将HSI与LiDAR拼接

    x = F.max_pool1d(x4,kernel_size=5)
    x = x.reshape(-1, x.shape[1] * x.shape[2]) # x4
    y = F.max_pool1d(y6,kernel_size=5)
    y = y.reshape(-1, y.shape[1] * y.shape[2]) # y6
    xy_ = F.max_pool1d(f1,kernel_size=5)
    xy_ = xy_.reshape(-1, xy_.shape[1] * xy_.shape[2]) # xy -- 2560

    x = self.drop1(x)
    y = self.drop2(y)
    xy_ = self.drop3(xy_)
    output_x = self.fusionlinear_1(x)
    output_y = self.fusionlinear_2(y)
    output_xy = self.fusionlinear_3(xy_)

    #weight = F.sigmoid(self.weight)
    weight = torch.sigmoid(self.weight)
    # weight = [0.4, 0.8]
    outputs = weight[0] * output_x + weight[1] * output_y + output_xy
    return outputs

# 实例化网络模型
net = LiSTNet().to(device)

8.设置损失函数和优化器

from sklearn.metrics import classification_report,accuracy_score
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(net.parameters(), lr=0.0001)
# scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer)

9.训练模型


total_loss = 0
for epoch in range(200):
    net.train()
    for i, (hsi, lidar, tr_labels) in enumerate(train_loader):
        hsi = hsi.to(device)
        lidar = lidar.to(device)
        tr_labels = tr_labels.to(device)
        # 优化器梯度归零
        optimizer.zero_grad()
        # 正向传播 + 反向传播 + 优化 
        output = net(hsi,lidar)
        loss = criterion(output, tr_labels)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    #print('[Epoch: %d] [loss avg: %.4f]   [current loss: %.4f]' %(epoch + 1, total_loss/(epoch+1), loss.item()))
    # scheduler.step(loss)
    net.eval()

    count = 0
    for hsi, lidar, gtlabels in test_loader:
      hsi = hsi.to(device)
      lidar = lidar.to(device)
      outputs = net(hsi,lidar)
      outputs = np.argmax(outputs.detach().cpu().numpy(), axis=1)
      if count == 0:
          y_pred_test =  outputs
          gty = gtlabels
          count = 1
      else:
          y_pred_test = np.concatenate( (y_pred_test, outputs) )#
          gty = np.concatenate( (gty, gtlabels) )
    acc1 = accuracy_score(gty,y_pred_test)  # 准确率的计算
    print('[Epoch: %d] [loss avg: %.4f]   [current loss: %.4f]' %(epoch + 1, total_loss/(epoch+1), loss.item()), ' acc: ', acc1)

print('Finished Training')

10.训练结果

截屏2022-03-20 18.38.48.png