使用Tensorflow构建自己的语音识别模型

语音识别在许多行业都是一个复杂的问题。了解有关处理音频数据以及如何对声音样本进行分类的一些基础知识对丰富个人能力是件很有益的事。
本文我们将通过一个使用Tensorflow对一些声音剪辑进行分类的例子,帮助你了解足够的基础知识,从而能够构建自己的语音识别模型。另外,你也可以通过进一步的学习,将这些概念应用到更大、更复杂的音频文件中。
本案例的完整代码可以在GitHub上获取。

一:获取数据

数据收集是数据科学中的难题之一。虽然有很多可用的数据,但并不是所有的数据都容易用于机器学习问题。因此必须确保数据是干净的、有标签的和完整的。
为了实现本次案例,我们将使用Google发布的一些音频文件,可以在Github上获取。
首先,我们将创建一个新的Conducto管道。在这里,您可以构建,训练和测试模型,并与其他感兴趣的人共享链接:

  1. ###
  2. # Main Pipeline
  3. ###
  4. def main() -> co.Serial:
  5. path = "/conducto/data/pipeline"
  6. root = co.Serial(image = get_image())
  7. # Get data from keras for testing and training
  8. root["Get Data"] = co.Exec(run_whole_thing, f"{path}/raw")
  9. return root

然后,开始编写 run_whole_thing 功能:

  1. def run_whole_thing(out_dir):
  2. os.makedirs(out_dir, exist_ok=True)
  3. # Set seed for experiment reproducibility
  4. seed = 55
  5. tf.random.set_seed(seed)
  6. np.random.seed(seed)
  7. data_dir = pathlib.Path("data/mini_speech_commands")

接下来,设置目录以保存音频文件:

  1. if not data_dir.exists():
  2. # Get the files from external source and put them in an accessible directory
  3. tf.keras.utils.get_file(
  4. 'mini_speech_commands.zip',
  5. origin="http://storage.googleapis.com/download.tensorflow.org/data/mini_speech_commands.zip",
  6. extract=True)

预处理数据

现在将数据保存在正确的目录中,可以将其拆分为训练、测试和验证数据集。
首先,我们需要编写一些函数来帮助预处理数据,以使其可以在我们的模型中起作用。
我们需要算法能够理解的数据格式。我们将使用卷积神经网络,所以数据需要转换成图像。
第一个函数将把二进制音频文件转换成一个张量:

  1. # Convert the binary audio file to a tensor
  2. def decode_audio(audio_binary):
  3. audio, _ = tf.audio.decode_wav(audio_binary)
  4. return tf.squeeze(audio, axis=-1)

由于我们有一个具有原始数据的张量,所以我们需要得到匹配它们的标签。这就是下面的函数通过从文件路径获取音频文件的标签功能:

  1. # Get the label (yes, no, up, down, etc) for an audio file.
  2. def get_label(file_path):
  3. parts = tf.strings.split(file_path, os.path.sep)
  4. return parts[-2]

接下来,我们需要将音频文件与正确的标签相关联。执行此操作并返回一个可与 Tensorflow配合使用的元组:

  1. # Create a tuple that has the labeled audio files
  2. def get_waveform_and_label(file_path):
  3. label = get_label(file_path)
  4. audio_binary = tf.io.read_file(file_path)
  5. waveform = decode_audio(audio_binary)
  6. return waveform, label

前面我们简要提到了使用卷积神经网络(CNN)算法。这是我们处理语音识别模型的方法之一。通常CNN在图像数据上工作得很好,有助于减少预处理时间。
我们要利用这一点,把音频文件转换成频谱图。频谱图是频率频谱的图像。如果查看一个音频文件,你会发现它只是频率数据。因此,我们要写一个将音频数据转换成图像的函数:

  1. # Convert audio files to images
  2. def get_spectrogram(waveform):
  3. # Padding for files with less than 16000 samples
  4. zero_padding = tf.zeros([16000] - tf.shape(waveform), dtype=tf.float32)
  5. # Concatenate audio with padding so that all audio clips will be of the same length
  6. waveform = tf.cast(waveform, tf.float32)
  7. equal_length = tf.concat([waveform, zero_padding], 0)
  8. spectrogram = tf.signal.stft(
  9. equal_length, frame_length=255, frame_step=128)
  10. spectrogram = tf.abs(spectrogram)
  11. return spectrogram

现在我们已经将数据格式化为图像,我们需要将正确的标签应用于这些图像。这与我们制作原始音频文件的做法类似:

  1. # Label the images created from the audio files and return a tuple
  2. def get_spectrogram_and_label_id(audio, label):
  3. spectrogram = get_spectrogram(audio)
  4. spectrogram = tf.expand_dims(spectrogram, -1)
  5. label_id = tf.argmax(label == commands)
  6. return spectrogram, label_id

我们需要的最后一个 helper 函数将处理传递给它的任何音频文件集的所有上述操作:

  1. # Preprocess any audio files
  2. def preprocess_dataset(files, autotune, commands):
  3. # Creates the dataset
  4. files_ds = tf.data.Dataset.from_tensor_slices(files)
  5. # Matches audio files with correct labels
  6. output_ds = files_ds.map(get_waveform_and_label,
  7. num_parallel_calls=autotune)
  8. # Matches audio file images to the correct labels
  9. output_ds = output_ds.map(
  10. get_spectrogram_and_label_id, num_parallel_calls=autotune)
  11. return output_ds

当已经有了所有这些辅助函数,我们就可以分割数据了。

将数据拆分为数据集

将音频文件转换为图像有助于使用CNN更容易处理数据,这就是我们编写所有这些帮助函数的原因。我们将做一些事情来简化数据的分割。
首先,我们将获得所有音频文件的潜在命令列表,我们将在代码的其他地方使用这些命令:

  1. # Get all of the commands for the audio files
  2. commands = np.array(tf.io.gfile.listdir(str(data_dir)))
  3. commands = commands[commands != 'README.md']

然后我们将得到数据目录中所有文件的列表,并对其进行混洗,以便为每个需要的数据集分配随机值:

  1. # Get a list of all the files in the directory
  2. filenames = tf.io.gfile.glob(str(data_dir) + '/*/*')
  3. # Shuffle the file names so that random bunches can be used as the training, testing, and validation sets
  4. filenames = tf.random.shuffle(filenames)
  5. # Create the list of files for training data
  6. train_files = filenames[:6400]
  7. # Create the list of files for validation data
  8. validation_files = filenames[6400: 6400 + 800]
  9. # Create the list of files for test data
  10. test_files = filenames[-800:]

现在,我们已经清晰地将培训、验证和测试文件分开,这样我们就可以继续对这些文件进行预处理,使它们为构建和测试模型做好准备。这里使用autotune来在运行时动态调整参数的值:

  1. autotune = tf.data.AUTOTUNE

第一个示例只是为了展示预处理的工作原理,它给了一些我们需要的spectrogram_ds值:

  1. # Get the converted audio files for training the model
  2. files_ds = tf.data.Dataset.from_tensor_slices(train_files)
  3. waveform_ds = files_ds.map(
  4. get_waveform_and_label, num_parallel_calls=autotune)
  5. spectrogram_ds = waveform_ds.map(
  6. get_spectrogram_and_label_id, num_parallel_calls=autotune)

既然已经了解了预处理的步骤过程,我们可以继续使用helper函数来处理所有数据集:

  1. # Preprocess the training, test, and validation datasets
  2. train_ds = preprocess_dataset(train_files, autotune, commands)
  3. validation_ds = preprocess_dataset(
  4. validation_files, autotune, commands)
  5. test_ds = preprocess_dataset(test_files, autotune, commands)

我们要设置一些训练示例,这些训练示例在每个时期的迭代中运行,因此我们将设置批处理大小:

  1. # Batch datasets for training and validation
  2. batch_size = 64
  3. train_ds = train_ds.batch(batch_size)
  4. validation_ds = validation_ds.batch(batch_size)

最后,我们可以利用缓存来减少训练模型时的延迟:

  1. # Reduce latency while training
  2. train_ds = train_ds.cache().prefetch(autotune)
  3. validation_ds = validation_ds.cache().prefetch(autotune)

最终,我们的数据集采用了可以训练模型的形式。

建立模型

由于数据集已明确定义,所以我们可以继续构建模型。我们将使用CNN创建模型,因此我们需要获取数据的形状以获取适用于我们图层的正确形状,然后我们继续按顺序构建模型:

  1. # Build model
  2. for spectrogram, _ in spectrogram_ds.take(1):
  3. input_shape = spectrogram.shape
  4. num_labels = len(commands)
  5. norm_layer = preprocessing.Normalization()
  6. norm_layer.adapt(spectrogram_ds.map(lambda x, _: x))
  7. model = models.Sequential([
  8. layers.Input(shape=input_shape),
  9. preprocessing.Resizing(32, 32),
  10. norm_layer,
  11. layers.Conv2D(32, 3, activation='relu'),
  12. layers.Conv2D(64, 3, activation='relu'),
  13. layers.MaxPooling2D(),
  14. layers.Dropout(0.25),
  15. layers.Flatten(),
  16. layers.Dense(128, activation='relu'),
  17. layers.Dropout(0.5),
  18. layers.Dense(num_labels),
  19. ])
  20. model.summary()

我们在模型上做了一些配置,以便给我们最好的准确性:

  1. # Configure built model with losses and metrics
  2. model.compile(
  3. optimizer=tf.keras.optimizers.Adam(),
  4. loss=tf.keras.losses.SparseCategoricalCrossentropy(
  5. from_logits=True),
  6. metrics=['accuracy'],
  7. )

模型建立好了,现在剩下的就是训练它了。

训练模型

在所有的工作都对数据进行预处理和建立模型之后,训练就相对简单了。我们确定要使用训练和验证数据集运行多少个周期:

  1. # Finally train the model and return info about each epoch
  2. EPOCHS = 10
  3. model.fit(
  4. train_ds,
  5. validation_data=validation_ds,
  6. epochs=EPOCHS,
  7. callbacks=tf.keras.callbacks.EarlyStopping(verbose=1, patience=2),
  8. )

这样这个模型就已经训练好了,现在需要对它进行测试。

测试模型

现在我们有了一个准确率约为83%的模型,是时候测试它在新数据上的表现了。所以我们使用测试数据集并将音频文件从标签中分离出来:

  1. # Test the model
  2. test_audio = []
  3. test_labels = []
  4. for audio, label in test_ds:
  5. test_audio.append(audio.numpy())
  6. test_labels.append(label.numpy())
  7. test_audio = np.array(test_audio)
  8. test_labels = np.array(test_labels)

然后我们获取音频数据并在我们的模型中使用它,看看它是否预测了正确的标签:

  1. # See how accurate the model is when making predictions on the test dataset
  2. y_pred = np.argmax(model.predict(test_audio), axis=1)
  3. y_true = test_labels
  4. test_acc = sum(y_pred == y_true) / len(y_true)
  5. print(f'Test set accuracy: {test_acc:.0%}')

完成管道

只需要编写一小段代码就可以完成您的管道并使其与任何人共享。这定义了将在Conducto管道中使用的图像,并处理文件执行:

  1. ###
  2. # Pipeline Helper functions
  3. ###
  4. def get_image():
  5. return co.Image(
  6. "python:3.8-slim",
  7. copy_dir=".",
  8. reqs_py=["conducto", "tensorflow", "keras"],
  9. )
  10. if __name__ == "__main__":
  11. co.main(default=main)

现在,你可以在终端中运行python pipeline.py——它应该会启动一个到新Conducto管道的链接。

结论

这是解决音频处理问题的方法之一,但是根据要分析的数据,它可能要复杂得多。如果将其构建在管道中,可以很轻松地与同事共享并在遇到错误时获得帮助或反馈。