使用torchaudio的语音命令识别

原文:https://pytorch.org/tutorials/intermediate/speech_command_recognition_with_torchaudio.html

本教程将向您展示如何正确设置音频数据集的格式,然后在数据集上训练/测试音频分类器网络。

Colab 提供了 GPU 选项。 在菜单选项卡中,选择“运行系统”,然后选择“更改运行系统类型”。 在随后的弹出窗口中,您可以选择 GPU。 更改之后,运行时应自动重新启动(这意味着来自已执行单元的信息会消失)。

首先,让我们导入常见的 Torch 包,例如torchaudio,可以按照网站上的说明进行安装。

  1. # Uncomment the following line to run in Google Colab
  2. # CPU:
  3. # !pip install torch==1.7.0+cpu torchvision==0.8.1+cpu torchaudio==0.7.0 -f https://download.pytorch.org/whl/torch_stable.html
  4. # GPU:
  5. # !pip install torch==1.7.0+cu101 torchvision==0.8.1+cu101 torchaudio==0.7.0 -f https://download.pytorch.org/whl/torch_stable.html
  6. # For interactive demo at the end:
  7. # !pip install pydub
  8. import torch
  9. import torch.nn as nn
  10. import torch.nn.functional as F
  11. import torch.optim as optim
  12. import torchaudio
  13. import matplotlib.pyplot as plt
  14. import IPython.display as ipd
  15. from tqdm.notebook import tqdm

让我们检查一下 CUDA GPU 是否可用,然后选择我们的设备。 在 GPU 上运行网络将大大减少训练/测试时间。

  1. device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
  2. print(device)

导入数据集

我们使用torchaudio下载并表示数据集。 在这里,我们使用 SpeechCommands,它是由不同人员说出的 35 个命令的数据集。 数据集SPEECHCOMMANDS是数据集的torch.utils.data.Dataset版本。 在此数据集中,所有音频文件的长度约为 1 秒(因此约为 16000 个时间帧)。

实际的加载和格式化步骤是在访问数据点时发生的,torchaudio负责将音频文件转换为张量。 如果想直接加载音频文件,可以使用torchaudio.load()。 它返回一个包含新创建的张量的元组以及音频文件的采样频率(SpeechCommands为 16kHz)。

回到数据集,这里我们创建一个子类,将其分为标准训练,验证和测试子集。

  1. from torchaudio.datasets import SPEECHCOMMANDS
  2. import os
  3. class SubsetSC(SPEECHCOMMANDS):
  4. def __init__(self, subset: str = None):
  5. super().__init__("./", download=True)
  6. def load_list(filename):
  7. filepath = os.path.join(self._path, filename)
  8. with open(filepath) as fileobj:
  9. return [os.path.join(self._path, line.strip()) for line in fileobj]
  10. if subset == "validation":
  11. self._walker = load_list("validation_list.txt")
  12. elif subset == "testing":
  13. self._walker = load_list("testing_list.txt")
  14. elif subset == "training":
  15. excludes = load_list("validation_list.txt") + load_list("testing_list.txt")
  16. excludes = set(excludes)
  17. self._walker = [w for w in self._walker if w not in excludes]
  18. # Create training and testing split of the data. We do not use validation in this tutorial.
  19. train_set = SubsetSC("training")
  20. test_set = SubsetSC("testing")
  21. waveform, sample_rate, label, speaker_id, utterance_number = train_set[0]

SPEECHCOMMANDS数据集中的数据点是一个由波形(音频信号),采样率,发声(标签),讲话者的 ID,发声数组成的元组。

  1. print("Shape of waveform: {}".format(waveform.size()))
  2. print("Sample rate of waveform: {}".format(sample_rate))
  3. plt.plot(waveform.t().numpy());

让我们找到数据集中可用的标签列表。

  1. labels = sorted(list(set(datapoint[2] for datapoint in train_set)))
  2. labels

35 个音频标签是用户说的命令。 前几个文件是人们所说的marvin

  1. waveform_first, *_ = train_set[0]
  2. ipd.Audio(waveform_first.numpy(), rate=sample_rate)
  3. waveform_second, *_ = train_set[1]
  4. ipd.Audio(waveform_second.numpy(), rate=sample_rate)

最后一个文件是有人说“视觉”。

  1. waveform_last, *_ = train_set[-1]
  2. ipd.Audio(waveform_last.numpy(), rate=sample_rate)

格式化数据

这是将转换应用于数据的好地方。 对于波形,我们对音频进行下采样以进行更快的处理,而不会损失太多的分类能力。

我们无需在此应用其他转换。 对于某些数据集,通常必须通过沿通道维度取平均值或仅保留其中一个通道来减少通道数量(例如,从立体声到单声道)。 由于SpeechCommands使用单个通道进行音频,因此此处不需要。

  1. new_sample_rate = 8000
  2. transform = torchaudio.transforms.Resample(orig_freq=sample_rate, new_freq=new_sample_rate)
  3. transformed = transform(waveform)
  4. ipd.Audio(transformed.numpy(), rate=new_sample_rate)

我们使用标签列表中的每个索引对每个单词进行编码。

  1. def label_to_index(word):
  2. # Return the position of the word in labels
  3. return torch.tensor(labels.index(word))
  4. def index_to_label(index):
  5. # Return the word corresponding to the index in labels
  6. # This is the inverse of label_to_index
  7. return labels[index]
  8. word_start = "yes"
  9. index = label_to_index(word_start)
  10. word_recovered = index_to_label(index)
  11. print(word_start, "-->", index, "-->", word_recovered)

为了将由录音和语音构成的数据点列表转换为该模型的两个成批张量,我们实现了整理函数,PyTorch DataLoader使用了该函数,允许我们分批迭代数据集。 有关使用整理函数的更多信息,请参见文档

在整理函数中,我们还应用了重采样和文本编码。

  1. def pad_sequence(batch):
  2. # Make all tensor in a batch the same length by padding with zeros
  3. batch = [item.t() for item in batch]
  4. batch = torch.nn.utils.rnn.pad_sequence(batch, batch_first=True, padding_value=0.)
  5. return batch.permute(0, 2, 1)
  6. def collate_fn(batch):
  7. # A data tuple has the form:
  8. # waveform, sample_rate, label, speaker_id, utterance_number
  9. tensors, targets = [], []
  10. # Gather in lists, and encode labels as indices
  11. for waveform, _, label, *_ in batch:
  12. tensors += [waveform]
  13. targets += [label_to_index(label)]
  14. # Group the list of tensors into a batched tensor
  15. tensors = pad_sequence(tensors)
  16. targets = torch.stack(targets)
  17. return tensors, targets
  18. batch_size = 256
  19. if device == "cuda":
  20. num_workers = 1
  21. pin_memory = True
  22. else:
  23. num_workers = 0
  24. pin_memory = False
  25. train_loader = torch.utils.data.DataLoader(
  26. train_set,
  27. batch_size=batch_size,
  28. shuffle=True,
  29. collate_fn=collate_fn,
  30. num_workers=num_workers,
  31. pin_memory=pin_memory,
  32. )
  33. test_loader = torch.utils.data.DataLoader(
  34. test_set,
  35. batch_size=batch_size,
  36. shuffle=False,
  37. drop_last=False,
  38. collate_fn=collate_fn,
  39. num_workers=num_workers,
  40. pin_memory=pin_memory,
  41. )

定义网络

在本教程中,我们将使用卷积神经网络来处理原始音频数据。 通常,更高级的转换将应用于音频数据,但是 CNN 可以用于准确处理原始数据。 具体架构是根据本文中描述的 M5 网络架构建模的。 模型处理原始音频数据的一个重要方面是其第一层过滤器的接收范围。 我们模型的第一个过滤器长度为 80,因此在处理以 8kHz 采样的音频时,接收场约为 10ms(而在 4kHz 时约为 20ms)。 此大小类似于语音处理应用,该应用通常使用 20ms 到 40ms 的接收域。

  1. class M5(nn.Module):
  2. def __init__(self, n_input=1, n_output=35, stride=16, n_channel=32):
  3. super().__init__()
  4. self.conv1 = nn.Conv1d(n_input, n_channel, kernel_size=80, stride=stride)
  5. self.bn1 = nn.BatchNorm1d(n_channel)
  6. self.pool1 = nn.MaxPool1d(4)
  7. self.conv2 = nn.Conv1d(n_channel, n_channel, kernel_size=3)
  8. self.bn2 = nn.BatchNorm1d(n_channel)
  9. self.pool2 = nn.MaxPool1d(4)
  10. self.conv3 = nn.Conv1d(n_channel, 2 * n_channel, kernel_size=3)
  11. self.bn3 = nn.BatchNorm1d(2 * n_channel)
  12. self.pool3 = nn.MaxPool1d(4)
  13. self.conv4 = nn.Conv1d(2 * n_channel, 2 * n_channel, kernel_size=3)
  14. self.bn4 = nn.BatchNorm1d(2 * n_channel)
  15. self.pool4 = nn.MaxPool1d(4)
  16. self.fc1 = nn.Linear(2 * n_channel, n_output)
  17. def forward(self, x):
  18. x = self.conv1(x)
  19. x = F.relu(self.bn1(x))
  20. x = self.pool1(x)
  21. x = self.conv2(x)
  22. x = F.relu(self.bn2(x))
  23. x = self.pool2(x)
  24. x = self.conv3(x)
  25. x = F.relu(self.bn3(x))
  26. x = self.pool3(x)
  27. x = self.conv4(x)
  28. x = F.relu(self.bn4(x))
  29. x = self.pool4(x)
  30. x = F.avg_pool1d(x, x.shape[-1])
  31. x = x.permute(0, 2, 1)
  32. x = self.fc1(x)
  33. return F.log_softmax(x, dim=2)
  34. model = M5(n_input=transformed.shape[0], n_output=len(labels))
  35. model.to(device)
  36. print(model)
  37. def count_parameters(model):
  38. return sum(p.numel() for p in model.parameters() if p.requires_grad)
  39. n = count_parameters(model)
  40. print("Number of parameters: %s" % n)

我们将使用与本文相同的优化技术,将权重衰减设置为 0.0001 的 Adam 优化器。 首先,我们将以 0.01 的学习率进行训练,但是在 20 个周期后的训练过程中,我们将使用scheduler将其降低到 0.001。

  1. optimizer = optim.Adam(model.parameters(), lr=0.01, weight_decay=0.0001)
  2. scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=20, gamma=0.1) # reduce the learning after 20 epochs by a factor of 10

训练和测试网络

现在,我们定义一个训练函数,它将训练数据输入模型中,并执行反向传播和优化步骤。 对于训练,我们将使用的损失是负对数可能性。 然后,在每个周期之后将对网络进行测试,以查看训练期间准确率如何变化。

  1. def train(model, epoch, log_interval):
  2. model.train()
  3. for batch_idx, (data, target) in enumerate(train_loader):
  4. data = data.to(device)
  5. target = target.to(device)
  6. # apply transform and model on whole batch directly on device
  7. data = transform(data)
  8. output = model(data)
  9. # negative log-likelihood for a tensor of size (batch x 1 x n_output)
  10. loss = F.nll_loss(output.squeeze(), target)
  11. optimizer.zero_grad()
  12. loss.backward()
  13. optimizer.step()
  14. # print training stats
  15. if batch_idx % log_interval == 0:
  16. print(f"Train Epoch: {epoch} [{batch_idx * len(data)}/{len(train_loader.dataset)} ({100\. * batch_idx / len(train_loader):.0f}%)]\tLoss: {loss.item():.6f}")
  17. # update progress bar
  18. pbar.update(pbar_update)
  19. # record loss
  20. losses.append(loss.item())

现在我们有了训练函数,我们需要制作一个用于测试网络准确率的函数。 我们将模型设置为eval()模式,然后对测试数据集进行推断。 调用eval()将网络中所有模块中的训练变量设置为false。 某些层(例如批量归一化层和丢弃层)在训练期间的行为会有所不同,因此此步骤对于获取正确的结果至关重要。

  1. def number_of_correct(pred, target):
  2. # count number of correct predictions
  3. return pred.squeeze().eq(target).sum().item()
  4. def get_likely_index(tensor):
  5. # find most likely label index for each element in the batch
  6. return tensor.argmax(dim=-1)
  7. def test(model, epoch):
  8. model.eval()
  9. correct = 0
  10. for data, target in test_loader:
  11. data = data.to(device)
  12. target = target.to(device)
  13. # apply transform and model on whole batch directly on device
  14. data = transform(data)
  15. output = model(data)
  16. pred = get_likely_index(output)
  17. correct += number_of_correct(pred, target)
  18. # update progress bar
  19. pbar.update(pbar_update)
  20. print(f"\nTest Epoch: {epoch}\tAccuracy: {correct}/{len(test_loader.dataset)} ({100\. * correct / len(test_loader.dataset):.0f}%)\n")

最后,我们可以训练和测试网络。 我们将训练网络十个周期,然后降低学习率,再训练十个周期。 在每个周期之后将对网络进行测试,以查看训练过程中准确率如何变化。

  1. log_interval = 20
  2. n_epoch = 2
  3. pbar_update = 1 / (len(train_loader) + len(test_loader))
  4. losses = []
  5. # The transform needs to live on the same device as the model and the data.
  6. transform = transform.to(device)
  7. with tqdm(total=n_epoch) as pbar:
  8. for epoch in range(1, n_epoch + 1):
  9. train(model, epoch, log_interval)
  10. test(model, epoch)
  11. scheduler.step()
  12. # Let's plot the training loss versus the number of iteration.
  13. # plt.plot(losses);
  14. # plt.title("training loss");

2 个周期后,测试集的网络准确率应超过 65%,而 21 个周期后,网络应达到 85%。 让我们看一下训练集中的最后几句话,看看模型是如何做到的。

  1. def predict(tensor):
  2. # Use the model to predict the label of the waveform
  3. tensor = tensor.to(device)
  4. tensor = transform(tensor)
  5. tensor = model(tensor.unsqueeze(0))
  6. tensor = get_likely_index(tensor)
  7. tensor = index_to_label(tensor.squeeze())
  8. return tensor
  9. waveform, sample_rate, utterance, *_ = train_set[-1]
  10. ipd.Audio(waveform.numpy(), rate=sample_rate)
  11. print(f"Expected: {utterance}. Predicted: {predict(waveform)}.")

如果有一个示例,我们来寻找一个分类错误的示例。

  1. for i, (waveform, sample_rate, utterance, *_) in enumerate(test_set):
  2. output = predict(waveform)
  3. if output != utterance:
  4. ipd.Audio(waveform.numpy(), rate=sample_rate)
  5. print(f"Data point #{i}. Expected: {utterance}. Predicted: {output}.")
  6. break
  7. else:
  8. print("All examples in this dataset were correctly classified!")
  9. print("In this case, let's just look at the last data point")
  10. ipd.Audio(waveform.numpy(), rate=sample_rate)
  11. print(f"Data point #{i}. Expected: {utterance}. Predicted: {output}.")

随意尝试使用其中一个标签的自己的录音! 例如,使用 Colab,在执行下面的单元格时说“ Go”。 这将录制一秒钟的音频并尝试对其进行分类。

  1. from google.colab import output as colab_output
  2. from base64 import b64decode
  3. from io import BytesIO
  4. from pydub import AudioSegment
  5. RECORD = """
  6. const sleep = time => new Promise(resolve => setTimeout(resolve, time))
  7. const b2text = blob => new Promise(resolve => {
  8. const reader = new FileReader()
  9. reader.onloadend = e => resolve(e.srcElement.result)
  10. reader.readAsDataURL(blob)
  11. })
  12. var record = time => new Promise(async resolve => {
  13. stream = await navigator.mediaDevices.getUserMedia({ audio: true })
  14. recorder = new MediaRecorder(stream)
  15. chunks = []
  16. recorder.ondataavailable = e => chunks.push(e.data)
  17. recorder.start()
  18. await sleep(time)
  19. recorder.onstop = async ()=>{
  20. blob = new Blob(chunks)
  21. text = await b2text(blob)
  22. resolve(text)
  23. }
  24. recorder.stop()
  25. })
  26. """
  27. def record(seconds=1):
  28. display(ipd.Javascript(RECORD))
  29. print(f"Recording started for {seconds} seconds.")
  30. s = colab_output.eval_js("record(%d)" % (seconds * 1000))
  31. print("Recording ended.")
  32. b = b64decode(s.split(",")[1])
  33. fileformat = "wav"
  34. filename = f"_audio.{fileformat}"
  35. AudioSegment.from_file(BytesIO(b)).export(filename, format=fileformat)
  36. return torchaudio.load(filename)
  37. waveform, sample_rate = record()
  38. print(f"Predicted: {predict(waveform)}.")
  39. ipd.Audio(waveform.numpy(), rate=sample_rate)

总结

在本教程中,我们使用了torchaudio来加载数据集并对信号进行重新采样。 然后,我们定义了经过训练的神经网络,以识别给定命令。 还有其他数据预处理方法,例如找到梅尔频率倒谱系数(MFCC),可以减小数据集的大小。 此变换也可以在torchaudio中作为torchaudio.transforms.MFCC使用。

脚本的总运行时间:(0 分钟 0.000 秒)

下载 Python 源码:speech_command_recognition_with_torchaudio.py

下载 Jupyter 笔记本:speech_command_recognition_with_torchaudio.ipynb

由 Sphinx 画廊生成的画廊