当时间步数较大或者时间步较小时,循环神经网络的梯度较容易出现衰减或爆炸。虽然裁剪梯度可以应对梯度爆炸,但无法解决梯度衰减的问题。
GRU
重置门与更新门
重置门和更新门的输入均为当前时间步输入与上一时间步隐藏状态,输出由激活函数为sigmoid函数的全连接层计算得到。
重置门与更新门计算公式如下:
其实计算方法与RNN没有什么差别。输出时重置门与更新门每个元素都处于0~1内。
候选隐藏状态
门控循环单元将计算候选隐藏状态来辅助稍后的隐藏状态计算。将当前时间步重置门的输出与上一时间步隐藏状态做按元素乘法,如果重置门中元素值接近0,那么意味着重置对应隐藏状态元素为0,即丢弃上一时间步的隐藏状态。如果元素值接近1,那么表示保留上一时间步的隐藏状态。将按元素乘法的结果与当前时间步的输入连结,再通过含激活函数tanh的全连接层计算出候选隐藏状态,其所有元素的值域为[−1,1]。
时间步候选隐藏状态的计算为:
隐藏状态
最终隐藏状态使用当前时间步的更新门来对上一时间步的隐藏状态和当前时间步的候选隐藏状态做组合:
可以看到,更新门上来就与前一隐藏状态按位乘,目的是为了把希望保持的隐藏状态继续维持下来;如果希望更新状态,那么送出的就是
所以:
import torch from torch import nn
import d2lzh
device = torch.device(‘cuda’ if torch.cuda.is_available() else ‘cpu’) corpus_indices, char_to_idx, idx_to_char, vocab_size = d2lzh.load_data_jay_lyrics()
def onehot(x, n_class, dtype=torch.float32): x = x.long() res = torch.zeros(x.shape[0], n_class, dtype=dtype, device=device) res.scatter(1, x.view(-1, 1), 1) return res
def to_onehot(x, n_class): return [one_hot(x[:, i], n_class) for i in range(x.shape[1])]
num_inputs, num_hiddens, num_outputs = vocab_size, 256, vocab_size
def get_params(): def _norm(shape): return torch.nn.Parameter(torch.normal(0, 0.01, shape, device=device), requires_grad=True)
def _zero(shape):return torch.nn.Parameter(torch.zeros(shape, device=device), requires_grad=True)def _three():return (_norm((num_inputs, num_hiddens)), _norm((num_hiddens, num_hiddens)), _zero(num_hiddens))W_xr, W_hr, b_r = _three()W_xz, W_hz, b_z = _three()W_xh, W_hh, b_h = _three()W_hq = _norm((num_hiddens, num_outputs))b_q = _zero(num_outputs)return nn.ParameterList([W_xz, W_hz, b_z, W_xr, W_hr, b_r, W_xh, W_hh, b_h, W_hq, b_q])
def init_gru_state(batch_size, num_hiddens): return torch.zeros((batch_size, num_hiddens), device=device)
def gru(inputs, state, params): W_xz, W_hz, b_z, W_xr, W_hr, b_r, W_xh, W_hh, b_h, W_hq, b_q = params H = state outputs = [] for X in inputs: Z = torch.sigmoid(torch.matmul(X, W_xz) + torch.matmul(H, W_hz) + b_z) R = torch.sigmoid(torch.matmul(X, W_xr) + torch.matmul(H, W_hr) + b_r) H_c = torch.tanh(torch.matmul(X, W_xh) + R torch.matmul(H, W_hh) + b_h) H = Z H + (1 - Z) * H_c outputs.append(torch.matmul(H, W_hq) + b_q) return outputs, H
def predict_rnn(prefix, num_chars, params): state = init_gru_state(1, num_hiddens) outputs = [char_to_idx[prefix[0]]] for step in range(len(prefix) + num_chars - 1): X = to_onehot(torch.tensor([[outputs[-1]]], device=device), vocab_size) output, state = gru(X, state, params) if step < len(prefix) - 1: outputs.append(char_to_idx[prefix[step + 1]]) else: outputs.append(int(output[0].argmax(dim=1).item())) return ‘’.join([idx_to_char[i] for i in outputs])
def train_and_predict_rnn(is_random_iter, num_epochs, num_steps, lr, clipping_theta, batch_size, pred_period, pred_len, prefixes): params = get_params() if is_random_iter: data_iter_fn = d2lzh.data_iter_random else: data_iter_fn = d2lzh.data_iter_consecutive loss = nn.CrossEntropyLoss()
for epoch in range(num_epochs):data_iter = data_iter_fn(corpus_indices, batch_size, num_steps, device)if not is_random_iter:state = init_gru_state(batch_size, num_hiddens)l_sum, n, start = 0.0, 0, time.time()for X, Y in data_iter:if is_random_iter:state = init_gru_state(batch_size, num_hiddens)else:state.detach_()inputs = to_onehot(X, vocab_size)outputs, state = gru(inputs, state, params)outputs = torch.cat(outputs, dim=0)y = torch.transpose(Y, 0, 1).contiguous().view(-1) # 必须加上contiguous,因为transpose操作底层tensor并不改变l = loss(outputs, y.long())l.backward()d2lzh.grad_clipping(params, clipping_theta, device)for param in params:param.data -= param.grad * lr# 梯度清零操作放在loss和back中间或者sgd下面都是可以的for param in params:param.grad.data.zero_()l_sum += l.item() * y.shape[0]n += y.shape[0]if (epoch + 1) % pred_period == 0:print('epoch %d, perplexity %f, time %.2f sec' % (epoch + 1, math.exp(l_sum / n), time.time() - start))for prefix in prefixes:print(' -', predict_rnn(prefix, pred_len, params))
if name == ‘main‘: num_epochs, num_steps, batch_size, lr, clipping_theta = 160, 35, 32, 1e2, 1e-2 pred_period, pred_len, prefixes = 40, 50, [‘分开’, ‘不分开’] train_and_predict_rnn(False, num_epochs, num_steps, lr, clipping_theta, batch_size, pred_period, pred_len, prefixes)
简洁实现:```pythonlr = 1e-2 # 注意调整学习率gru_layer = nn.GRU(input_size=vocab_size, hidden_size=num_hiddens)model = d2l.RNNModel(gru_layer, vocab_size).to(device)d2l.train_and_predict_rnn_pytorch(model, num_hiddens, vocab_size, device,corpus_indices, idx_to_char, char_to_idx,num_epochs, num_steps, lr, clipping_theta,batch_size, pred_period, pred_len, prefixes)
LSTM
输入门、遗忘门和输出门
这一部分跟GRU挺类似的,多了一个门控单元
算式如下:
候选记忆细胞
与GRU不同,这个候选细胞是通过输入和隐藏状态直接得到结果的:
记忆细胞
当前时间步记忆细胞的计算组合了上一时间步记忆细胞和当前时间步候选记忆细胞的信息,并通过遗忘门和输入门来控制信息的流动:
隐藏状态
最后根据记忆细胞可以输出最终的隐藏状态:
代码
import torchfrom torch import nnimport d2lzhdevice = torch.device('cuda' if torch.cuda.is_available() else 'cpu')corpus_indices, char_to_idx, idx_to_char, vocab_size = d2lzh.load_data_jay_lyrics()num_inputs, num_hiddens, num_outputs = vocab_size, 256, vocab_sizedef get_params():def _norm(shape):return torch.nn.Parameter(torch.normal(0, 0.01, shape, device=device), requires_grad=True)def _zero(shape):return torch.nn.Parameter(torch.zeros(shape, device=device), requires_grad=True)def _three():return (_norm((num_inputs, num_hiddens)), _norm((num_hiddens, num_hiddens)), _zero(num_hiddens))W_xf, W_hf, b_f = _three()W_xi, W_hi, b_i = _three()W_xo, W_ho, b_o = _three()W_xc, W_hc, b_c = _three()W_hq = _norm((num_hiddens, num_outputs))b_q = _zero(num_outputs)return nn.ParameterList([W_xi, W_hi, b_i, W_xf, W_hf, b_f, W_xo, W_ho, b_o, W_xc, W_hc, b_c, W_hq, b_q])def init_lstm_state(batch_size, num_hiddens, device):return (torch.zeros((batch_size, num_hiddens), device=device),torch.zeros((batch_size, num_hiddens), device=device))def lstm(inputs, state, params):W_xi, W_hi, b_i, W_xf, W_hf, b_f, W_xo, W_ho, b_o, W_xc, W_hc, b_c, W_hq, b_q = params(H, C) = stateoutputs = []for X in inputs:F = torch.sigmoid(torch.matmul(X, W_xf) + torch.matmul(H, W_hf) + b_f)I = torch.sigmoid(torch.matmul(X, W_xi) + torch.matmul(H, W_hi) + b_i)O = torch.sigmoid(torch.matmul(X, W_xo) + torch.matmul(H, W_ho) + b_o)C_tilde = torch.tanh(torch.matmul(X, W_xc) + torch.matmul(H, W_hc) + b_c)C = F * C + I * C_tildeH = O * torch.tanh(C)outputs.append(torch.matmul(H, W_hq) + b_q)return outputs, (H, C)if __name__ == '__main__':num_epochs, num_steps, batch_size, lr, clipping_theta = 160, 35, 32, 1e2, 1e-2pred_period, pred_len, prefixes = 40, 50, ['分开', '不分开']d2lzh.train_and_predict_rnn(lstm, get_params, init_lstm_state, num_hiddens,vocab_size, device, corpus_indices, idx_to_char,char_to_idx, False, num_epochs, num_steps, lr,clipping_theta, batch_size, pred_period, pred_len,prefixes)
简洁实现:
lr = 1e-2 # 注意调整学习率lstm_layer = nn.LSTM(input_size=vocab_size, hidden_size=num_hiddens)model = d2l.RNNModel(lstm_layer, vocab_size)d2l.train_and_predict_rnn_pytorch(model, num_hiddens, vocab_size, device,corpus_indices, idx_to_char, char_to_idx,num_epochs, num_steps, lr, clipping_theta,batch_size, pred_period, pred_len, prefixes)
Deep Recurrent Neural Networks
第1隐藏层的隐藏状态和之前的计算一样:
第l隐藏层的隐藏状态的表达式为:
很容易发现,深度循环神经网络每个时间步输出的state形状为(LayersNum, HiddenNum)
最终,输出层的输出只需基于第L隐藏层(最后一层)的隐藏状态:
EncoderDecoder
这个无需描述了,就是处理输入输出不等长的序列,中间的state是序列在语义空间上的表述。
import torchfrom torch import nnclass Encoder(nn.Module):def __init__(self):super().__init__()def forward(self, X):passclass Decoder(nn.Module):def __init__(self):super().__init__()def init_state(self, enc_outputs):passdef forward(self, dec_x, state):passclass EncoderDecoder(nn.Module):def __init__(self, encoder, decoder):super().__init__()self.encoder = encoderself.decoder = decoderdef forward(self, encoder_x, decoder_x):encode_output = self.encoder(encoder_x)state = self.decoder.init_state(encode_output)return self.decoder(decoder_x, state)
Seq2Seq Learning
在训练数据集中,我们可以在每个句子后附上特殊符号“
Encoder
编码器的作用是把一个不定长的输入序列变换成一个定长的背景变量c,并在该背景变量中编码输入序列信息。常用的编码器是循环神经网络。
编码器通过自定义函数q将各个时间步隐藏状态转换为背景变量:
可以选择,这样背景变量就是输入序列最终时间步的隐藏状态
注:由于Encoder使用的可能是深度循环神经网络,因此state在dim=0上的维度不为1,导致在之后的decoder上不能与序列进行拼接并输入。所以背景变量c可以只取state矩阵的最后一行(对应深度循环神经网络的最后一个隐藏层)即可。
class Seq2SeqEncoder(nn.Module):def __init__(self, vocab_size, embed_size, num_hiddens, num_layers, dropout=0):super().__init__()self.embedding = nn.Embedding(vocab_size, embed_size)self.rnn = nn.GRU(input_size=embed_size, hidden_size=num_hiddens, num_layers=num_layers, dropout=dropout)def forward(self, input, *args):# embedding实质上是对张量进行增维,但要注意张量中每一个值不能超过vocab_sizeX = self.embedding(input)# 将张量的形状转换为(时间步,batch大小,emb_size)X = X.permute(1, 0, 2)output, state = self.rnn(X)return (output, state)
Decoder
编码器输出的背景变量c编码了整个输入序列的信息。给定训练样本中的输出序列,对每个时间步t’(符号与输入序列或编码器的时间步有区别,因为后移了一位),解码器输出的条件概率将基于之前的输出序列和背景变量c,即。
解码器将上一时间步的输出以及背景变量c作为输入,并将它们与上一时间步的隐藏状态变换为当前时间步的隐藏状态。
有了解码器的隐藏状态后,我们可以使用自定义的输出层和softmax运算来计算,可以直接输出onehot向量。
class Seq2SeqDecoder(nn.Module):def __init__(self, vocab_size, embed_size, num_hiddens, num_layers, dropout=0):super().__init__()self.embedding = nn.Embedding(vocab_size, embed_size)self.rnn = nn.GRU(input_size=embed_size + num_hiddens, hidden_size=num_hiddens, num_layers=num_layers,dropout=dropout)self.dense = nn.Linear(num_hiddens, vocab_size)def init_state(self, encoder_outputs, *args):return encoder_outputs[1]def forward(self, input, state):X = self.embedding(input)X = X.permute(1, 0, 2)# 定义context,使context时间步上的维度与X时间步上的维度一致context = state[-1].repeat(X.shape[0], 1, 1)# 将context与X在embedding的维度上连接起来X_and_context = torch.cat((X, context), dim=2)Y, state = self.rnn(X_and_context, state)output = self.dense(Y).permute(1, 0, 2)return output, state
训练
在模型训练中,所有输出序列损失的均值通常作为需要最小化的损失函数。在上图所描述的模型预测中,我们需要将解码器在上一个时间步的输出作为当前时间步的输入。与此不同,在训练中我们也可以将标签序列(训练集的真实输出序列)在上一个时间步的标签作为解码器在当前时间步的输入。这叫作强制教学(teacher forcing)。
如下图所示:
import torchfrom torch import nnimport d2l.torch as d2lclass Seq2SeqEncoder(nn.Module):def __init__(self, vocab_size, embed_size, num_hiddens, num_layers, dropout=0):super().__init__()self.embedding = nn.Embedding(vocab_size, embed_size)self.rnn = nn.GRU(input_size=embed_size, hidden_size=num_hiddens, num_layers=num_layers, dropout=dropout)def forward(self, input, *args):# embedding实质上是对张量进行增维,但要注意张量中每一个值不能超过vocab_sizeX = self.embedding(input)# 将张量的形状转换为(时间步,batch大小,emb_size)X = X.permute(1, 0, 2)output, state = self.rnn(X)return (output, state)class Seq2SeqDecoder(nn.Module):def __init__(self, vocab_size, embed_size, num_hiddens, num_layers, dropout=0):super().__init__()self.embedding = nn.Embedding(vocab_size, embed_size)self.rnn = nn.GRU(input_size=embed_size + num_hiddens, hidden_size=num_hiddens, num_layers=num_layers,dropout=dropout)self.dense = nn.Linear(num_hiddens, vocab_size)def init_state(self, encoder_outputs, *args):return encoder_outputs[1]def forward(self, input, state):X = self.embedding(input)X = X.permute(1, 0, 2)# 定义context,使context时间步上的维度与X时间步上的维度一致context = state[-1].repeat(X.shape[0], 1, 1)# 将context与X在embedding的维度上连接起来X_and_context = torch.cat((X, context), dim=2)Y, state = self.rnn(X_and_context, state)output = self.dense(Y).permute(1, 0, 2)return output, state# 由于输入的都是定长,长度为10的序列,但是句子长度是不同的,那么需要定义一个mask来把超过句子长度的部分抹去def sequence_mask(X, valid_len, value=0):"""Mask irrelevant entries in sequences."""maxlen = X.size(1)mask = torch.arange((maxlen), dtype=torch.float32,device=X.device)[None, :] < valid_len[:, None]X[~mask] = valuereturn X# @saveclass MaskedSoftmaxCELoss(nn.CrossEntropyLoss):"""The softmax cross-entropy loss with masks."""# `pred` shape: (`batch_size`, `num_steps`, `vocab_size`)# `label` shape: (`batch_size`, `num_steps`)# `valid_len` shape: (`batch_size`,)def forward(self, pred, label, valid_len):weights = torch.ones_like(label)weights = sequence_mask(weights, valid_len)self.reduction = 'none'unweighted_loss = super(MaskedSoftmaxCELoss,self).forward(pred.permute(0, 2, 1), label)# 把超过序列长度部分的loss给mask掉之后,就都变成0了,仅保留有效部分的lossweighted_loss = (unweighted_loss * weights).mean(dim=1)return weighted_loss# @savedef train_seq2seq(net, data_iter, lr, num_epochs, tgt_vocab, device):"""Train a model for sequence to sequence."""def xavier_init_weights(m):if type(m) == nn.Linear:nn.init.xavier_uniform_(m.weight)if type(m) == nn.GRU:for param in m._flat_weights_names:if "weight" in param:nn.init.xavier_uniform_(m._parameters[param])net.apply(xavier_init_weights)net.to(device)optimizer = torch.optim.Adam(net.parameters(), lr=lr)loss = MaskedSoftmaxCELoss()net.train()animator = d2l.Animator(xlabel='epoch', ylabel='loss',xlim=[10, num_epochs])for epoch in range(num_epochs):timer = d2l.Timer()metric = d2l.Accumulator(2) # Sum of training loss, no. of tokensfor batch in data_iter:optimizer.zero_grad()X, X_valid_len, Y, Y_valid_len = [x.to(device) for x in batch]bos = torch.tensor([tgt_vocab['<bos>']] * Y.shape[0],device=device).reshape(-1, 1)dec_input = torch.cat([bos, Y[:, :-1]], 1) # Teacher forcingY_hat, _ = net(X, dec_input, X_valid_len)l = loss(Y_hat, Y, Y_valid_len)l.sum().backward() # Make the loss scalar for `backward`d2l.grad_clipping(net, 1)num_tokens = Y_valid_len.sum()optimizer.step()with torch.no_grad():metric.add(l.sum(), num_tokens)if (epoch + 1) % 10 == 0:animator.add(epoch + 1, (metric[0] / metric[1],))print(f'loss {metric[0] / metric[1]:.3f}, {metric[1] / timer.stop():.1f} 'f'tokens/sec on {str(device)}')embed_size, num_hiddens, num_layers, dropout = 32, 32, 2, 0.1batch_size, num_steps = 64, 10lr, num_epochs, device = 0.005, 300, d2l.try_gpu()train_iter, src_vocab, tgt_vocab = d2l.load_data_nmt(batch_size, num_steps)encoder = Seq2SeqEncoder(len(src_vocab), embed_size, num_hiddens, num_layers,dropout)decoder = Seq2SeqDecoder(len(tgt_vocab), embed_size, num_hiddens, num_layers,dropout)net = d2l.EncoderDecoder(encoder, decoder)train_seq2seq(net, train_iter, lr, num_epochs, tgt_vocab, device)
