在随机采样中,每个样本都是在原始的长序列上任意捕获的子序列。
在迭代过程中,来自两个相邻的、随机的、小批量中的子序列不一定在原始序列上相邻。
对于语言建模,目标是基于到目前为止我们看到的词元来预测下一个词元,
因此标签是移位了一个词元的原始序列。
下面的代码每次都从数据中随机生成一个小批量。
在这里,参数 batch_size =5指定了每个小批量中子序列样本的数目,
参数 num_steps=2 是每个子序列中预定义的时间步数。
def seq_data_iter_random(corpus, batch_size, num_steps):
"""使用随机抽样生成一个小批量子序列。"""
# 从随机偏移量开始对序列进行分区,随机范围包括`num_steps - 1`
corpus = corpus[random.randint(0, num_steps - 1):]
# 减去1,是因为我们需要考虑标签
num_subseqs = (len(corpus) - 1) // num_steps
# 长度为`num_steps`的子序列的起始索引
initial_indices = list(range(0, num_subseqs * num_steps, num_steps))
# 在随机抽样的迭代过程中,
# 来自两个相邻的、随机的、小批量中的子序列不一定在原始序列上相邻
random.shuffle(initial_indices)
def data(pos):
# 返回从`pos`位置开始的长度为`num_steps`的序列
return corpus[pos: pos + num_steps]
num_batches = num_subseqs // batch_size
for i in range(0, batch_size * num_batches, batch_size):
# 在这里,`initial_indices`包含子序列的随机起始索引
initial_indices_per_batch = initial_indices[i: i + batch_size]
X = [data(j) for j in initial_indices_per_batch]
Y = [data(j + 1) for j in initial_indices_per_batch]
yield torch.tensor(X), torch.tensor(Y)
my_seq = list(range(35))
for X, Y in d2l.seq_data_iter_random(my_seq, batch_size=2, num_steps=5):
print('X: ', X, '\nY:', Y)
X: tensor([[15, 16, 17, 18, 19],
[10, 11, 12, 13, 14]])
Y: tensor([[16, 17, 18, 19, 20],
[11, 12, 13, 14, 15]])
X: tensor([[ 5, 6, 7, 8, 9],
[20, 21, 22, 23, 24]])
Y: tensor([[ 6, 7, 8, 9, 10],
[21, 22, 23, 24, 25]])
X: tensor([[25, 26, 27, 28, 29],
[ 0, 1, 2, 3, 4]])
Y: tensor([[26, 27, 28, 29, 30],
[ 1, 2, 3, 4, 5]])
基于相同的设置,通过顺序分区读取每个小批量的子序列的特征 X 和标签 Y。
通过将它们打印出来可以注意到,
迭代期间来自两个相邻的小批量中的子序列在原始序列中确实是相邻的。
def seq_data_iter_sequential(corpus, batch_size, num_steps):
"""使用顺序分区生成一个小批量子序列。"""
# 从随机偏移量开始划分序列
offset = random.randint(0, num_steps)
num_tokens = ((len(corpus) - offset - 1) // batch_size) * batch_size
Xs = torch.tensor(corpus[offset: offset + num_tokens])
Ys = torch.tensor(corpus[offset + 1: offset + 1 + num_tokens])
Xs, Ys = Xs.reshape(batch_size, -1), Ys.reshape(batch_size, -1)
num_batches = Xs.shape[1] // num_steps
for i in range(0, num_steps * num_batches, num_steps):
X = Xs[:, i: i + num_steps]
Y = Ys[:, i: i + num_steps]
yield X, Y
for X, Y in d2l.seq_data_iter_sequential(my_seq, batch_size=2, num_steps=5):
print('X: ', X, '\nY:', Y)
X: tensor([[ 5, 6, 7, 8, 9],
[19, 20, 21, 22, 23]])
Y: tensor([[ 6, 7, 8, 9, 10],
[20, 21, 22, 23, 24]])
X: tensor([[10, 11, 12, 13, 14],
[24, 25, 26, 27, 28]])
Y: tensor([[11, 12, 13, 14, 15],
[25, 26, 27, 28, 29]])
现在,我们将上面的两个采样函数包装到一个类中,以便稍后可以将其用作数据迭代器。
class SeqDataLoader: #@save
"""加载序列数据的迭代器。"""
def __init__(self, batch_size, num_steps, use_random_iter, max_tokens):
if use_random_iter:
self.data_iter_fn = d2l.seq_data_iter_random
else:
self.data_iter_fn = d2l.seq_data_iter_sequential
self.corpus, self.vocab = d2l.load_corpus_time_machine(max_tokens)
self.batch_size, self.num_steps = batch_size, num_steps
def __iter__(self):
return self.data_iter_fn(self.corpus, self.batch_size, self.num_steps)
最后,我们定义了一个函数 load_data_time_machine ,
它同时返回数据迭代器和词汇表,
因此可以与其他带有 load_data 前缀的函数
(如 3.5节 中定义的 d2l.load_data_fashion_mnist )类似地使用。
def load_data_time_machine(batch_size, num_steps, #@save
use_random_iter=False, max_tokens=10000):
"""返回时光机器数据集的迭代器和词汇表。"""
data_iter = SeqDataLoader(
batch_size, num_steps, use_random_iter, max_tokens)
return data_iter, data_iter.vocab