代码
import copyimport osimport torchimport torch.nn as nnimport numpy as npimport pandas as pdfrom sklearn.preprocessing import MinMaxScalerimport datetimeimport joblibimport matplotlib.pyplot as pltclass LSTMRegression(nn.Module): """ LSTM模型时间序列预测 """ def __init__(self, input_size, hidden_size, num_layers, output_size=1,): super().__init__() # LSTM模型 self.lstm = nn.LSTM(input_size, hidden_size, num_layers) # RNN模型 # self.lstm = nn.RNN(input_size, hidden_size, num_layers) self.fc = nn.Linear(hidden_size, output_size) self.last_time = None self.seq_step = [] def forward(self, _x): x, _ = self.lstm(_x) # _x is input, size (seq_len, batch, input_size) x = self.fc(x) return xclass LSTMProduct: def __init__(self, seq_step=5, hidden_size=12, num_layers=2): self.seq_step = seq_step self.sate = None if os.path.exists('./parameter.pkl'): self.sate = torch.load("./parameter.pkl") # 预测模式 self.lstm_model = LSTMRegression(seq_step, hidden_size=hidden_size, num_layers=num_layers) # 重新加载模型参数 self.lstm_model.load_state_dict(self.sate["model"]) else: self.lstm_model = LSTMRegression(seq_step, hidden_size=hidden_size, num_layers=num_layers) def load_data(self, path): """ :param path: CSV文件路径, 时间间隔无法确定的情况下要考虑是否需要重采样 :return: """ df = pd.read_csv(path) df['time'] = pd.to_datetime(df['time']) # 根据时间进行排序 df.sort_values(by='time', ascending=True, inplace=True) # 将时间设置成索引 df2 = df.set_index(keys='time') # 时间重采样 1H df3 = df2.resample("1H").mean() index = df3.index data = df3['level'].values return index, data def create_dataset(self, data): """ 创建数据集 data格式: (index, values) """ index, value = data dataset_x, dataset_y, dataset_y_index = [], [], [] for i in range(len(value) - self.seq_step): _x = value[i:(i + self.seq_step)] dataset_x.append(_x) dataset_y.append(value[i + self.seq_step]) dataset_y_index.append(index[i + self.seq_step]) return np.array(dataset_x), np.array(dataset_y), dataset_y_index def data_parser(self, data): """ 数据处理: 将数据归一化处理,不然损失结果非常大 data格式: (index, values) """ index, value = data scaler = MinMaxScaler(feature_range=(-1, 1)) scaler_value = scaler.fit_transform(value.reshape(-1, 1)).reshape(-1, ) joblib.dump(scaler, "scaler.model") return index, scaler_value def result_data_parser(self, out): """预测结果处理""" scaler = joblib.load("scaler.model") data = scaler.inverse_transform(np.array(out.view(-1).tolist()).reshape(-1, 1)) return data def product(self, train_x): out = self.lstm_model(train_x) scaler = joblib.load("scaler.model") scaler_value = scaler.inverse_transform(np.array(out.tolist()).reshape(-1, 1)).reshape(-1, ) return scaler_value.reshape(-1, ), out.view(-1).tolist() def train_model(self, train_data, epoch=5000, lr=0.01): """ 训练模型 """ train_x, train_y = train_data loss_function = nn.MSELoss(reduce=True, size_average=True) optimizer = torch.optim.Adam(self.lstm_model.parameters(), lr=lr) min_loss = 100 flag = 0 for i in range(epoch): out = self.lstm_model(train_x) loss = loss_function(out, train_y) # 反向传播 loss.backward() # 计算梯度 optimizer.step() # 梯度0 optimizer.zero_grad() # 损失值 loss_value = loss.item() if (i+1) % 100 == 0: print('Epoch: {}, Loss:{:.6f}'.format(i+1, loss_value)) # 保存损失最小的模型 if min_loss > loss_value: self.sate = {"model": self.lstm_model.state_dict(), "last_seq": self.lstm_model.seq_step, "last_time": self.lstm_model.last_time} # 如果损失比较小提前结束训练 if min_loss - loss_value <= 1e-8: flag += 1 else: flag = 0 if flag >= 3: torch.save(self.sate, './parameter.pkl') break min_loss = loss_value torch.save(self.sate, './parameter.pkl') def train(self, data_path='./train_data.csv'): """训练模型""" # 加载数据 data = self.load_data(data_path) # # 数据处理 parser_data = self.data_parser(data) self.lstm_model.last_time = parser_data[0][-1] self.lstm_model.seq_step = parser_data[-1][-self.seq_step:].tolist() # 如果是训练模型就训练模型 dataset = self.create_dataset(parser_data) # 将数据改变形状,RNN 读入的数据维度是 (seq_size, batch_size, feature_size) train_x = dataset[0].reshape(1, -1, self.seq_step) train_y = dataset[1].reshape(1, -1, 1) # 转为pytorch的tensor对象 train_x = torch.from_numpy(train_x).to(torch.float32) train_y = torch.from_numpy(train_y).to(torch.float32) # 训练模型 self.train_model((train_x, train_y)) return self def get_mse(self, data_1, data_2): """ 获取均方误差 :return: """ input_ = torch.from_numpy(np.array(data_1)) output = torch.from_numpy(np.array(data_2)) loss_function = nn.MSELoss(reduce=True, size_average=True) return loss_function(input_, output).item() def forecast(self, num=5): forecast_list = [] seq_step = copy.deepcopy(self.sate.get("last_seq")) last_time = self.sate.get("last_time") for i in range(num): train_x = np.array(seq_step[-self.seq_step:]).reshape(1, -1, self.seq_step) train_x = train_x.reshape(1, -1, self.seq_step) train_x = torch.from_numpy(train_x).to(torch.float32) product_data, scalar_product_data = self.product(train_x) current_time = last_time + np.timedelta64(1, 'h') last_time = current_time forecast_list.append((current_time.strftime("%Y-%m-%d %H:%M:%S"), product_data[0])) seq_step.extend(scalar_product_data) return forecast_list def test(self): # 未来24小时数据用来计算测试误差 data_1 = [165, 142, 136,147,170,196,217,226,224,217,213,217,233,262,302,347,387,415, 423,410,377,333,283,235] data = [i[-1] for i in self.forecast(24)] print(self.get_mse(data, data_1))if __name__ == '__main__': lstm_model = LSTMProduct(seq_step=60, hidden_size=2, num_layers=1) # lstm_model.train(data_path="./train_data.csv") lstm_model.test() print(lstm_model.forecast(24))
数据
train_data.csv