分享

Python3使用科大讯飞API接口实现音频文件转写

 npkaida 2020-02-12

注意事项

科大讯飞语音转写 API 文档链接: https://www./doc/asr/lfasr/API.html.
科大讯飞语音转写Python3的demo下载链接:http://xfyun-doc.ufile./1564736425808301/weblfasr_python3_demo.zip
上一篇写了用百度智能云进行音频文件转写的博客,但是那个效果啊,有点惨不忍睹,至少我的识别结果是这样。然后转而使用了一下科大讯飞的,想着科大讯飞专门做语音相关的这一块,应该会好些。语音转写的Python3的demo代码确实很不错,函数接口很简洁,本文代码都是这个demo里面的。识别准确率还是可以的,而且不需要像百度那样整点才开始识别,很快就返回了识别结果。
如果你的录音是不止一个人,而是像电话录音那种,想把转写结果中不同人说的话分离出来,请按照下面这样添加预处理参数(demo中默认是没有添加这儿最后两个参数的,不添加的话,默认是不进行角色分离的):
在这里插入图片描述
这样的话,转写结果的speaker的值就不全是0了,而是根据不同的人对转写结果进行分离:
在这里插入图片描述

操作系统:Windows
Python:3.6
可用时长: 免费用户时长5小时,且用且珍惜。
音频属性: 采样率16k或8k、位长8bits或16bits、单声道&多声道
音频格式: wav/flac/opus/m4a/mp3
音频大小: 不超过500M
音频时长: 不超过5小时,建议5分钟以上
语言种类: 中文普通话、英文
转写结果保存时长 30天。(同一通录音不需要重新上传识别,如果你已经上传识别过了,之后只需要使用api.get_result_request(taskid)的方式即可再次获取识别结果,taskid是你第一次上传录音时给你分配的任务ID,避免重复上传浪费可用时长)

APP_ID, SECRET_KEY的获取

讯飞的好像不需要API_KEY,开放授权的方式和其他大厂的类似:
1、页面右上方“控制台”点击进入,登录讯飞账号(没有就注册一个),进入讯飞开放平台。
2、左侧导航栏上方,依次选择 语音识别->语音转写->离线语音转写识别。
3、服务申请。点击“创建应用”,“接口选择”已默认勾选完成,如无其他需求,无需勾选,完成其他资料后,点击最下方“立即创建”按钮。自己可以手动领取5小时免费试用体验包。
4、应用成功则页面显示“创建完毕”,点击”返回应用列表”, 查看新创建应用详情,在服务接口认证信息窗口就可以看到返回的AppID,SecretKey。

话不多说,直接上代码了

# -*- coding: utf-8 -*-# # author: yanmeng2# # 非实时转写调用demoimport base64import hashlibimport hmacimport jsonimport osimport timeimport requestslfasr_host = 'http://raasr./api'# 请求的接口名api_prepare = '/prepare'api_upload = '/upload'api_merge = '/merge'api_get_progress = '/getProgress'api_get_result = '/getResult'# 文件分片大小10Mfile_piece_sice = 10485760# ——————————————————转写可配置参数————————————————# 参数可在官网界面(https://doc./rest_api/%E8%AF%AD%E9%9F%B3%E8%BD%AC%E5%86%99.html)查看,根据需求可自行在gene_params方法里添加修改# 转写类型lfasr_type = 0# 是否开启分词has_participle = 'false'has_seperate = 'true'# 多候选词个数max_alternatives = 0# 子用户标识suid = ''class SliceIdGenerator: '''slice id生成器''' def __init__(self): self.__ch = 'aaaaaaaaa`' def getNextSliceId(self): ch = self.__ch j = len(ch) - 1 while j >= 0: cj = ch[j] if cj != 'z': ch = ch[:j] + chr(ord(cj) + 1) + ch[j + 1:] break else: ch = ch[:j] + 'a' + ch[j + 1:] j = j - 1 self.__ch = ch return self.__chclass RequestApi(object): def __init__(self, appid, secret_key, upload_file_path): self.appid = appid self.secret_key = secret_key self.upload_file_path = upload_file_path # 根据不同的apiname生成不同的参数,本示例中未使用全部参数您可在官网(https://doc./rest_api/%E8%AF%AD%E9%9F%B3%E8%BD%AC%E5%86%99.html)查看后选择适合业务场景的进行更换 def gene_params(self, apiname, taskid=None, slice_id=None): appid = self.appid secret_key = self.secret_key upload_file_path = self.upload_file_path ts = str(int(time.time())) m2 = hashlib.md5() m2.update((appid + ts).encode('utf-8')) md5 = m2.hexdigest() md5 = bytes(md5, encoding='utf-8') # 以secret_key为key, 上面的md5为msg, 使用hashlib.sha1加密结果为signa signa = hmac.new(secret_key.encode('utf-8'), md5, hashlib.sha1).digest() signa = base64.b64encode(signa) signa = str(signa, 'utf-8') file_len = os.path.getsize(upload_file_path) file_name = os.path.basename(upload_file_path) param_dict = {} if apiname == api_prepare: # slice_num是指分片数量,如果您使用的音频都是较短音频也可以不分片,直接将slice_num指定为1即可 slice_num = int(file_len / file_piece_sice) + (0 if (file_len % file_piece_sice == 0) else 1) param_dict['app_id'] = appid param_dict['signa'] = signa param_dict['ts'] = ts param_dict['file_len'] = str(file_len) param_dict['file_name'] = file_name param_dict['slice_num'] = str(slice_num) elif apiname == api_upload: param_dict['app_id'] = appid param_dict['signa'] = signa param_dict['ts'] = ts param_dict['task_id'] = taskid param_dict['slice_id'] = slice_id elif apiname == api_merge: param_dict['app_id'] = appid param_dict['signa'] = signa param_dict['ts'] = ts param_dict['task_id'] = taskid param_dict['file_name'] = file_name elif apiname == api_get_progress or apiname == api_get_result: param_dict['app_id'] = appid param_dict['signa'] = signa param_dict['ts'] = ts param_dict['task_id'] = taskid return param_dict # 请求和结果解析,结果中各个字段的含义可参考:https://doc./rest_api/%E8%AF%AD%E9%9F%B3%E8%BD%AC%E5%86%99.html def gene_request(self, apiname, data, files=None, headers=None): response = requests.post(lfasr_host + apiname, data=data, files=files, headers=headers) result = json.loads(response.text) if result['ok'] == 0: print('{} success:'.format(apiname) + str(result)) return result else: print('{} error:'.format(apiname) + str(result)) exit(0) return result # 预处理 def prepare_request(self): return self.gene_request(apiname=api_prepare, data=self.gene_params(api_prepare)) # 上传 def upload_request(self, taskid, upload_file_path): file_object = open(upload_file_path, 'rb') try: index = 1 sig = SliceIdGenerator() while True: content = file_object.read(file_piece_sice) if not content or len(content) == 0: break files = { 'filename': self.gene_params(api_upload).get('slice_id'), 'content': content } response = self.gene_request(api_upload, data=self.gene_params(api_upload, taskid=taskid, slice_id=sig.getNextSliceId()), files=files) if response.get('ok') != 0: # 上传分片失败 print('upload slice fail, response: ' + str(response)) return False print('upload slice ' + str(index) + ' success') index += 1 finally: 'file index:' + str(file_object.tell()) file_object.close() return True # 合并 def merge_request(self, taskid): return self.gene_request(api_merge, data=self.gene_params(api_merge, taskid=taskid)) # 获取进度 def get_progress_request(self, taskid): return self.gene_request(api_get_progress, data=self.gene_params(api_get_progress, taskid=taskid)) # 获取结果 def get_result_request(self, taskid): return self.gene_request(api_get_result, data=self.gene_params(api_get_result, taskid=taskid)) def all_api_request(self): # 1. 预处理 pre_result = self.prepare_request() taskid = pre_result['data'] # 2 . 分片上传 self.upload_request(taskid=taskid, upload_file_path=self.upload_file_path) # 3 . 文件合并 self.merge_request(taskid=taskid) # 4 . 获取任务进度 while True: # 每隔20秒获取一次任务进度 progress = self.get_progress_request(taskid) progress_dic = progress if progress_dic['err_no'] != 0 and progress_dic['err_no'] != 26605: print('task error: ' + progress_dic['failed']) return else: data = progress_dic['data'] task_status = json.loads(data) if task_status['status'] == 9: print('task ' + taskid + ' finished') break print('The task ' + taskid + ' is in processing, task status: ' + str(data)) # 每次获取进度间隔20S time.sleep(20) # 5 . 获取结果 self.get_result_request(taskid=taskid)# 注意:如果出现requests模块报错:'NoneType' object has no attribute 'read', 请尝试将requests模块更新到2.20.0或以上版本(本demo测试版本为2.20.0)# 输入讯飞开放平台的appid,secret_key和待转写的文件路径if __name__ == '__main__': APP_ID = '***' SECRET_KEY = '****' file_path = r'***.wav' api = RequestApi(appid=APP_ID, secret_key=SECRET_KEY, upload_file_path=file_path) api.all_api_request()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • 152
  • 153
  • 154
  • 155
  • 156
  • 157
  • 158
  • 159
  • 160
  • 161
  • 162
  • 163
  • 164
  • 165
  • 166
  • 167
  • 168
  • 169
  • 170
  • 171
  • 172
  • 173
  • 174
  • 175
  • 176
  • 177
  • 178
  • 179
  • 180
  • 181
  • 182
  • 183
  • 184
  • 185
  • 186
  • 187
  • 188
  • 189
  • 190
  • 191
  • 192
  • 193
  • 194
  • 195
  • 196
  • 197
  • 198
  • 199
  • 200
  • 201
  • 202
  • 203
  • 204
  • 205
  • 206
  • 207
  • 208
  • 209
  • 210

运行结果像这样的

当然,你可以根据自己的需求对demo进行改进,比如你想并发识别录音,你可以添加多线程执行的函数,为了获取taskid方便,我在class的初始化里边添加了self.taskid = “None”,并在预处理结果返回之后重新对taskid赋值。
在这里插入图片描述

def thread_func(wav_file_path, txt_file_path):  # 线程函数,方便并发识别录音    doc = open(txt_file_path, 'w', encoding='utf-8')    # doc.close()    api = RequestApi(appid=APP_ID, secret_key=SECRET_KEY, upload_file_path=wav_file_path)    api.all_api_request()  # demo中这个函数是完整过程执行,但我把提取结果的模块提出来了    print('taskid is: ' + api.taskid, file=doc)    result = api.get_result_request(api.taskid)    result = eval(result['data'])    # print(result)    for x in result:        print(x)        print(x, file=doc)    doc.close()#主函数写成类似这种if __name__ == '__main__':    file_list = [        'o2019082112552587460156',        'o2019082115552587460127'    ]    APP_ID = '***'    SECRET_KEY = '***'    file_read_path = r'D:\MyProject\Python\Voice_SDK\20190820\\'    file_save_path = r'D:\MyProject\Python\Voice_SDK\20190820_xunfei\\'    for file in file_list: #多并发批量执行        wav_file_path = file_read_path + file + '.wav'        txt_file_path = file_save_path + file + '.txt'        t = threading.Thread(target=thread_func, args=(wav_file_path, txt_file_path))        t.start()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30

    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约