栏目分类:
子分类:
返回
文库吧用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
文库吧 > IT > 前沿技术 > 人工智能 > 语音识别

用 Python 训练自己的语音识别系统,这波操作稳了

语音识别 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

近几年来语音识别技术得到了迅速发展,从手机中的Siri语音智能助手、微软的小娜以及各种平台的智能音箱等等,各种语音识别的项目得到了广泛应用。

语音识别属于感知智能,而让机器从简单的识别语音到理解语音,则上升到了认知智能层面,机器的自然语言理解能力如何,也成为了其是否有智慧的标志,而自然语言理解正是目前难点。

同时考虑到目前大多数的语音识别平台都是借助于智能云,对于语音识别的训练对于大多数人而言还较为神秘,故今天我们将利用python搭建自己的语音识别系统。

最终模型的识别效果如下:





实验前的准备

首先我们使用的python版本是3.6.5所用到的库有cv2库用来图像处理;

Numpy库用来矩阵运算;Keras框架用来训练和加载模型。Librosa和python_speech_features库用于提取音频特征。Glob和pickle库用来读取本地数据集。



数据集准备
首先数据集使用的是清华大学的thchs30中文数据。

这些录音根据其文本内容分成了四部分,A(句子的ID是1~250),B(句子的ID是251~500),C(501~750),D(751~1000)。ABC三组包括30个人的10893句发音,用来做训练,D包括10个人的2496句发音,用来做测试。

data文件夹中包含(.wav文件和.trn文件;trn文件里存放的是.wav文件的文字描述:第一行为词,第二行为拼音,第三行为音素);

数据集如下:





模型训练

1、提取语音数据集的MFCC特征:
首先人的声音是通过声道产生的,声道的形状决定了发出怎样的声音。如果我们可以准确的知道这个形状,那么我们就可以对产生的音素进行准确的描述。声道的形状在语音短时功率谱的包络中显示出来。而MFCCs就是一种准确描述这个包络的一种特征。

其中提取的MFCC特征如下图可见。



故我们在读取数据集的基础上,要将其语音特征提取存储以方便加载入神经网络进行训练。

其对应的代码如下:

  1. #读取数据集文件  
  2. text_paths = glob.glob('data/*.trn')  
  3. total = len(text_paths)  
  4. print(total)  
  5. with open(text_paths[0], 'r', encoding='utf8') as fr:  
  6. lines = fr.readlines  
  7. print(lines)  
  8. #数据集文件trn内容读取保存到数组中  
  9. texts =   
  10. paths =   
  11. for path in text_paths:  
  12. with open(path, 'r', encoding='utf8') as fr:  
  13. lines = fr.readlines  
  14. line = lines[0].strip('n').replace(' ', '')  
  15. texts.append(line)  
  16. paths.append(path.rstrip('.trn'))  
  17. print(paths[0], texts[0])  
  18. #定义mfcc数  
  19. mfcc_dim = 13  
  20. #根据数据集标定的音素读入  
  21. def load_and_trim(path):  
  22. audio, sr = librosa.load(path)  
  23. energy = librosa.feature.rmse(audio)  
  24. frames = np.nonzero(energy >= np.max(energy) / 5)  
  25. indices = librosa.core.frames_to_samples(frames)[1]  
  26. audio = audio[indices[0]:indices[-1]] if indices.size else audio[0:0]  
  27. return audio, sr  
  28. #提取音频特征并存储  
  29. features =   
  30. for i in tqdm(range(total)):  
  31. path = paths[i]  
  32. audio, sr = load_and_trim(path)  
  33. features.append(mfcc(audio, sr, numcep=mfcc_dim, nfft=551))  
  34. print(len(features), features[0].shape) 

2、神经网络预处理:
在进行神经网络加载训练前,我们需要对读取的MFCC特征进行归一化,主要目的是为了加快收敛,提高效果和减少干扰。然后处理好数据集和标签定义输入和输出即可。

对应代码如下:

  1. #随机选择100个数据集  
  2. samples = random.sample(features, 100)  
  3. samples = np.vstack(samples)  
  4. #平均MFCC的值为了归一化处理  
  5. mfcc_mean = np.mean(samples, axis=0)  
  6. #计算标准差为了归一化  
  7. mfcc_std = np.std(samples, axis=0)  
  8. print(mfcc_mean)  
  9. print(mfcc_std)  
  10. #归一化特征  
  11. features = [(feature - mfcc_mean) / (mfcc_std + 1e-14) for feature in features]  
  12. #将数据集读入的标签和对应id存储列表  
  13. chars = {}  
  14. for text in texts:  
  15. for c in text:  
  16. chars[c] = chars.get(c, 0) + 1  
  17. chars = sorted(chars.items, key=lambda x: x[1], reverse=True)  
  18. chars = [char[0] for char in chars]  
  19. print(len(chars), chars[:100])  
  20. char2id = {c: i for i, c in enumerate(chars)}  
  21. id2char = {i: c for i, c in enumerate(chars)}  
  22. data_index = np.arange(total)  
  23. np.random.shuffle(data_index)  
  24. train_size = int(0.9 * total)  
  25. test_size = total - train_size  
  26. train_index = data_index[:train_size]  
  27. test_index = data_index[train_size:]  
  28. #神经网络输入和输出X,Y的读入数据集特征  
  29. X_train = [features[i] for i in train_index]  
  30. Y_train = [texts[i] for i in train_index]  
  31. X_test = [features[i] for i in test_index]  
  32. Y_test = [texts[i] for i in test_index] 

3、神经网络函数定义:
其中包括训练的批次,卷积层函数、标准化函数、激活层函数等等。

其中第⼀个维度为⼩⽚段的个数,原始语⾳越长,第⼀个维度也越⼤, 第⼆个维度为 MFCC 特征的维度。得到原始语⾳的数值表⽰后,就可以使⽤ WaveNet 实现。由于 MFCC 特征为⼀维序列,所以使⽤ Conv1D 进⾏卷积。 因果是指,卷积的输出只和当前位置之前的输⼊有关,即不使⽤未来的 特征,可以理解为将卷积的位置向前偏移。WaveNet 模型结构如下所⽰:



具体如下可见:

  1. batch_size = 16  
  2. #定义训练批次的产生,一次训练16个  
  3. def batch_generator(x, y, batch_size=batch_size):  
  4. offset = 0  
  5. while True:  
  6. offset += batch_size  
  7. if offset == batch_size or offset >= len(x):  
  8. data_index = np.arange(len(x))  
  9. np.random.shuffle(data_index)  
  10. x = [x[i] for i in data_index]  
  11. y = [y[i] for i in data_index]  
  12. offset = batch_size  
  13. X_data = x[offset - batch_size: offset]  
  14. Y_data = y[offset - batch_size: offset]  
  15. X_maxlen = max([X_data[i].shape[0] for i in range(batch_size)])  
  16. Y_maxlen = max([len(Y_data[i]) for i in range(batch_size)])  
  17. X_batch = np.zeros([batch_size, X_maxlen, mfcc_dim])  
  18. Y_batch = np.ones([batch_size, Y_maxlen]) * len(char2id)  
  19. X_length = np.zeros([batch_size, 1], dtype='int32')  
  20. Y_length = np.zeros([batch_size, 1], dtype='int32')  
  21. for i in range(batch_size):  
  22. X_length[i, 0] = X_data[i].shape[0]  
  23. X_batch[i, :X_length[i, 0], :] = X_data[i]  
  24. Y_length[i, 0] = len(Y_data[i])  
  25. Y_batch[i, :Y_length[i, 0]] = [char2id[c] for c in Y_data[i]]  
  26. inputs = {'X': X_batch, 'Y': Y_batch, 'X_length': X_length, 'Y_length': Y_length}  
  27. outputs = {'ctc': np.zeros([batch_size])}  
  28. epochs = 50  
  29. num_blocks = 3  
  30. filters = 128  
  31. X = Input(shape=(None, mfcc_dim,), dtype='float32', name='X')  
  32. Y = Input(shape=(None,), dtype='float32', name='Y')  
  33. X_length = Input(shape=(1,), dtype='int32', name='X_length')  
  34. Y_length = Input(shape=(1,), dtype='int32', name='Y_length')  
  35. #卷积1层  
  36. def conv1d(inputs, filters, kernel_size, dilation_rate):  
  37. return Conv1D(filters=filters, kernel_size=kernel_size, strides=1, padding='causal', activation=None,  
  38. dilation_rate=dilation_rate)(inputs)  
  39. #标准化函数  
  40. def batchnorm(inputs):  
  41. return BatchNormalization(inputs)  
  42. #激活层函数  
  43. def activation(inputs, activation):  
  44. return Activation(activation)(inputs)  
  45. #全连接层函数  
  46. def res_block(inputs, filters, kernel_size, dilation_rate):  
  47. hf = activation(batchnorm(conv1d(inputs, filters, kernel_size, dilation_rate)), 'tanh')  
  48. hg = activation(batchnorm(conv1d(inputs, filters, kernel_size, dilation_rate)), 'sigmoid')  
  49. h0 = Multiply([hf, hg])  
  50. ha = activation(batchnorm(conv1d(h0, filters, 1, 1)), 'tanh')  
  51. hs = activation(batchnorm(conv1d(h0, filters, 1, 1)), 'tanh')  
  52. return Add([ha, inputs]), hs  
  53. h0 = activation(batchnorm(conv1d(X, filters, 1, 1)), 'tanh')  
  54. shortcut =   
  55. for i in range(num_blocks):  
  56. for r in [1, 2, 4, 8, 16]:  
  57. h0, s = res_block(h0, filters, 7, r)  
  58. shortcut.append(s)  
  59. h1 = activation(Add(shortcut), 'relu')  
  60. h1 = activation(batchnorm(conv1d(h1, filters, 1, 1)), 'relu')  
  61. #softmax损失函数输出结果  
  62. Y_pred = activation(batchnorm(conv1d(h1, len(char2id) + 1, 1, 1)), 'softmax')  
  63. sub_model = Model(inputs=X, outputs=Y_pred)  
  64. #计算损失函数  
  65. def calc_ctc_loss(args):  
  66. y, yp, ypl, yl = args  
  67. return K.ctc_batch_cost(y, yp, ypl, yl) 

4、模型的训练:
训练的过程如下可见:





 

  1. ctc_loss = Lambda(calc_ctc_loss, output_shape=(1,), name='ctc')([Y, Y_pred, X_length, Y_length])  
  2. #加载模型训练  
  3. model = Model(inputs=[X, Y, X_length, Y_length], outputs=ctc_loss)  
  4. #建立优化器  
  5. optimizer = SGD(lr=0.02, momentum=0.9, nesterov=True, clipnorm=5)  
  6. #激活模型开始计算  
  7. model.compile(loss={'ctc': lambda ctc_true, ctc_pred: ctc_pred}, optimizer=optimizer)  
  8. checkpointer = ModelCheckpoint(filepath='asr.h5', verbose=0)  
  9. lr_decay = ReduceLRonPlateau(monitor='loss', factor=0.2, patience=1, min_lr=0.000)  
  10. #开始训练  
  11. history = model.fit_generator(  
  12. generator=batch_generator(X_train, Y_train),  
  13. steps_per_epoch=len(X_train) // batch_size,  
  14. epochs=epochs,  
  15. validation_data=batch_generator(X_test, Y_test),  
  16. validation_steps=len(X_test) // batch_size,  
  17. callbacks=[checkpointer, lr_decay])  
  18. #保存模型  
  19. sub_model.save('asr.h5')  
  20. #将字保存在pl=pkl中  
  21. with open('dictionary.pkl', 'wb') as fw:  
  22. pickle.dump([char2id, id2char, mfcc_mean, mfcc_std], fw)  
  23. train_loss = history.history['loss']  
  24. valid_loss = history.history['val_loss']  
  25. plt.plot(np.linspace(1, epochs, epochs), train_loss, label='train')  
  26. plt.plot(np.linspace(1, epochs, epochs), valid_loss, label='valid')  
  27. plt.legend(loc='upper right')  
  28. plt.xlabel('Epoch')  
  29. plt.ylabel('Loss')  
  30. plt.show 



测试模型
读取我们语音数据集生成的字典,通过调用模型来对音频特征识别。

代码如下:

  1. wavs = glob.glob('A2_103.wav')  
  2. print(wavs)  
  3. with open('dictionary.pkl', 'rb') as fr:  
  4. [char2id, id2char, mfcc_mean, mfcc_std] = pickle.load(fr)  
  5. mfcc_dim = 13  
  6. model = load_model('asr.h5')  
  7. index = np.random.randint(len(wavs))  
  8. print(wavs[index])  
  9. audio, sr = librosa.load(wavs[index])  
  10. energy = librosa.feature.rmse(audio)  
  11. frames = np.nonzero(energy >= np.max(energy) / 5)  
  12. indices = librosa.core.frames_to_samples(frames)[1]  
  13. audio = audio[indices[0]:indices[-1]] if indices.size else audio[0:0]  
  14. X_data = mfcc(audio, sr, numcep=mfcc_dim, nfft=551)  
  15. X_data = (X_data - mfcc_mean) / (mfcc_std + 1e-14)  
  16. print(X_data.shape)  
  17. pred = model.predict(np.expand_dims(X_data, axis=0))  
  18. pred_ids = K.eval(K.ctc_decode(pred, [X_data.shape[0]], greedy=False, beam_width=10, top_paths=1)[0][0])  
  19. pred_ids = pred_ids.flatten.tolist  
  20. print(''.join([id2char[i] for i in pred_ids]))  
  21. yield (inputs, outputs) 

到这里,我们整体的程序就搭建完成,下面为我们程序的运行结果:



源码地址:

https://pan.baidu.com/s/1tFlZkMJmrMTD05cd_zxmAg

提取码:ndrr

数据集需要自行下载。

 

转载请注明:文章转载自 www.wk8.com.cn
本文地址:https://www.wk8.com.cn/it/795927.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 wk8.com.cn

ICP备案号:晋ICP备2021003244-6号