diff --git a/AutoCoverTool/online/tone_shift_one.py b/AutoCoverTool/online/tone_shift_one.py index 7c87e42..d4e2b7c 100644 --- a/AutoCoverTool/online/tone_shift_one.py +++ b/AutoCoverTool/online/tone_shift_one.py @@ -1,342 +1,369 @@ """ 变调的方式做处理 1. 下载 2. 分离 3. 针对于人声变调+2,伴奏+1 4. 合成 """ import os import json import shutil import librosa import logging import numpy as np +import multiprocessing as mp + from ref.music_remover.separate_interface import SeparateInterface from online.inference_worker import upload_file2cos, gs_state_use, gs_state_finish, gs_state_default from online.common import * from ref.online.voice_class_online import VoiceClass logging.basicConfig(filename='/tmp/tone_shift_one.log', level=logging.INFO) gs_tone_shift_exe = "/opt/soft/bin/tone_shift_exe" gs_simple_mixer_path = "/opt/soft/bin/simple_mixer" gs_err_code_success = 0 gs_err_code_tone_shift = 1 gs_err_code_mix = 2 gs_err_code_transcode = 3 gs_err_code_upload = 4 gs_err_code_download = 5 gs_err_code_trans_to_mp3 = 6 gs_err_code_separate = 7 gs_err_code_duration_too_long = 8 gs_err_code_duration_no_vocal = 9 gs_err_code_duration_err = 10 gs_err_code_transcode_acc = 11 gs_err_code_upload_acc = 12 gs_err_code_download_acc = 13 gs_err_code_download_vocal = 14 gs_err_code_transcode_acc_v1 = 15 gs_err_code_transcode_vocal_v1 = 16 gs_err_code_silence_no_data = 17 gs_err_code_silence_no_process = 18 +def post_process_err_callback(msg): + print("ERROR|post_process|task_error_callback:", msg) + + def exec_cmd(cmd): r = os.popen(cmd) text = r.read() r.close() return text def get_d(audio_path): cmd = "ffprobe -v quiet -print_format json -show_format -show_streams {}".format(audio_path) data = exec_cmd(cmd) data = json.loads(data) # 返回秒 if 'format' in data.keys() and 'duration' in data['format']: return float(data["format"]["duration"]) return -1 def get_mean_power(audio_path): sr = 44100 audio, sr = librosa.load(audio_path, sr=sr, mono=True) mm = np.mean(np.abs(audio)) return mm +def tone_shift_one(in_file, dst_file, pitch): + cmd = "{} {} {} {}".format(gs_tone_shift_exe, in_file, dst_file, pitch) + os.system(cmd) + return os.path.exists(dst_file) + + +def mix(cid, vocal_path, acc_path, tp): + if tp == 1: + vocal_pitch = 2 + acc_pitch = 0 + else: + vocal_pitch = -2 + acc_pitch = 0 + + vocal_path_2 = vocal_path.replace(".wav", "_{}.wav".format(vocal_pitch)) + acc_path_2 = acc_path.replace(".wav", "_{}.wav".format(acc_pitch)) + err = tone_shift_one(vocal_path, vocal_path_2, vocal_pitch) + if not err: + return gs_err_code_tone_shift, None, None, tp + + err = tone_shift_one(acc_path, acc_path_2, acc_pitch) + if not err: + return gs_err_code_tone_shift, None, None, tp + + base_dir = os.path.dirname(vocal_path) + mix_path = "{}/mix_{}_{}.wav".format(base_dir, vocal_pitch, acc_pitch) + cmd = "{} {} {} {}".format(gs_simple_mixer_path, vocal_path_2, acc_path_2, mix_path) + print("exec_cmd={}".format(cmd)) + os.system(cmd) + + if not os.path.exists(mix_path): + return gs_err_code_mix, None, None, tp + + # 转码 + mix_path_mp3 = mix_path.replace(".wav", ".mp4") + cmd = "ffmpeg -i {} -b:a 128k -c:a aac -ar 44100 -ac 2 -y {} -loglevel fatal".format(mix_path, mix_path_mp3) + os.system(cmd) + if not os.path.exists(mix_path_mp3): + return gs_err_code_transcode, None, None, tp + + # 上传到cos + mix_name = os.path.basename(mix_path_mp3) + key = "av_res/svc_res_tone_shift/{}/{}".format(str(cid), mix_name) + if not upload_file2cos(key, mix_path_mp3): + return gs_err_code_upload, None, None + return gs_err_code_success, key, vocal_path_2, tp + + class ToneShift: def __init__(self): self.separate_inst = SeparateInterface() model_path = "./models" music_voice_pure_model = os.path.join(model_path, "voice_005_rec_v5.pth") music_voice_no_pure_model = os.path.join(model_path, "voice_10_v5.pth") gender_pure_model = os.path.join(model_path, "gender_8k_ratev5_v6_adam.pth") gender_no_pure_model = os.path.join(model_path, "gender_8k_v6_adam.pth") self.voice_class = VoiceClass(music_voice_pure_model, music_voice_no_pure_model, gender_pure_model, gender_no_pure_model) def update_state(self, song_id, state): sql = "update svc_queue_table set state={},update_time={} where song_id = {}". \ format(state, int(time.time()), song_id) banned_user_map['db'] = "av_db" update_db(sql, banned_user_map) def get_url_by_id(self, song_id): sql = "select song_id, url from svc_queue_table where song_id={}".format(song_id) banned_user_map["db"] = "av_db" data = get_data_by_mysql(sql) if len(data) == 0: return None, None return str(data[0][0]), data[0][1] def get_one_data_logic(self): """ 按照5,4,3的优先级进行获取 :return: """ song_src_arr = [5, 4, 3] for song_src in song_src_arr: song_id, song_url = self.get_one_data(song_src=song_src) if song_id is not None: return song_id, song_url return None, None def get_one_data(self, song_src=3): sql = "select song_id, url from svc_queue_table where state = 0 and song_src={} order by create_time asc limit 1".format( song_src) banned_user_map["db"] = "av_db" data = get_data_by_mysql(sql, banned_user_map) if len(data) == 0: return None, None song_id, song_url = data[0] if song_id != "": self.update_state(song_id, gs_state_use) return str(song_id), song_url def pre_process(self, work_dir, song_url): """ 创建文件夹,下载数据 :return: """ if "?sign=" in song_url: return gs_err_code_download ext = str(song_url).split(".")[-1] dst_file = "{}/src_origin.{}".format(work_dir, ext) cmd = "wget {} -O {}".format(song_url, dst_file) os.system(cmd) if not os.path.exists(dst_file): return gs_err_code_download duration = get_d(dst_file) if duration < 0: return gs_err_code_duration_err print("Duration:", dst_file, duration) if duration > 20 * 60: return gs_err_code_duration_too_long dst_mp3_file = "{}/src.wav".format(work_dir) cmd = "ffmpeg -i {} -ar 44100 -ac 2 -y {} ".format(dst_file, dst_mp3_file) os.system(cmd) if not os.path.exists(dst_mp3_file): return gs_err_code_trans_to_mp3 return gs_err_code_success - def tone_shift_one(self, in_file, dst_file, pitch): - cmd = "{} {} {} {}".format(gs_tone_shift_exe, in_file, dst_file, pitch) - os.system(cmd) - return os.path.exists(dst_file) - - def mix(self, cid, vocal_path, acc_path, tp): - if tp == 1: - vocal_pitch = 2 - acc_pitch = 0 - else: - vocal_pitch = -2 - acc_pitch = 0 - - vocal_path_2 = vocal_path.replace(".wav", "_{}.wav".format(vocal_pitch)) - acc_path_2 = acc_path.replace(".wav", "_{}.wav".format(acc_pitch)) - err = self.tone_shift_one(vocal_path, vocal_path_2, vocal_pitch) - if not err: - return gs_err_code_tone_shift, None, None - - gender, female_rate = self.voice_class.process_one(vocal_path_2) - # 性别映射,由0:女 1:男 2:未知 映射为 1:男 2:女 3: 未知 - # GENDER_FEMALE = 0,GENDER_MALE = 1,GENDER_OTHER = 2 - mmap = [2, 1, 3] - gender = mmap[gender] - err = self.tone_shift_one(acc_path, acc_path_2, acc_pitch) - if not err: - return gs_err_code_tone_shift, None, None - - base_dir = os.path.dirname(vocal_path) - mix_path = "{}/mix_{}_{}.wav".format(base_dir, vocal_pitch, acc_pitch) - cmd = "{} {} {} {}".format(gs_simple_mixer_path, vocal_path_2, acc_path_2, mix_path) - print("exec_cmd={}".format(cmd)) - os.system(cmd) - - if not os.path.exists(mix_path): - return gs_err_code_mix, None, None - - # 转码 - mix_path_mp3 = mix_path.replace(".wav", ".mp4") - cmd = "ffmpeg -i {} -b:a 128k -c:a aac -ar 44100 -ac 2 -y {} -loglevel fatal".format(mix_path, mix_path_mp3) - os.system(cmd) - if not os.path.exists(mix_path_mp3): - return gs_err_code_transcode, None, None - - # 上传到cos - mix_name = os.path.basename(mix_path_mp3) - key = "av_res/svc_res_tone_shift/{}/{}".format(str(cid), mix_name) - if not upload_file2cos(key, mix_path_mp3): - return gs_err_code_upload, None, None - return gs_err_code_success, key, gender - def upload_acc(self, cid, acc_path): # 转码 mix_path_aac = acc_path.replace(".wav", ".m4a") cmd = "ffmpeg -i {} -b:a 128k -c:a aac -ar 44100 -ac 2 -y {} -loglevel fatal".format(acc_path, mix_path_aac) os.system(cmd) if not os.path.exists(mix_path_aac): return gs_err_code_transcode_acc, None # 上传 mix_name = os.path.basename(mix_path_aac) key = "av_res/svc_res_tone_shift/{}/{}".format(str(cid), mix_name) if not upload_file2cos(key, mix_path_aac): return gs_err_code_upload_acc, None return gs_err_code_success, key + def async_mix(self, cid, vocal_path, acc_path): + pool = mp.Pool(processes=2) + res = [] + for i in range(1, 3): + ret = pool.apply_async(mix, args=(cid, vocal_path, acc_path, i), error_callback=post_process_err_callback) + res.append(ret) + pool.close() + pool.join() + + real_res = [] + for i in res: + real_res.append(i.get(timeout=10 * 60)) + return real_res + def process_one(self, cid, work_dir): """ :param cid: :param work_dir: :return: """ src_mp3 = os.path.join(work_dir, "src.wav") vocal_path = os.path.join(work_dir, "vocal.wav") acc_path = os.path.join(work_dir, "acc.wav") if not (os.path.exists(vocal_path) and os.path.exists(acc_path)): if not self.separate_inst.process(cid, src_mp3, vocal_path, acc_path): return gs_err_code_separate, [] if not os.path.exists(vocal_path) or not os.path.exists(acc_path): return gs_err_code_separate, [] # 当人声的平均能量小于一定值时,则认为无人声(0.01是经验值判定,样本分析来看) # 无人声的样本[0.0056, 0.0003], 有人声的样本(目前最小)[0.046, 0.049] print("power:{},{}".format(cid, get_mean_power(vocal_path))) if get_mean_power(vocal_path) < 0.02: return gs_err_code_duration_no_vocal, [] - err, type1_mix_mp3, gender = self.mix(cid, vocal_path, acc_path, 1) - if err != gs_err_code_success: - return err, [] - - err, type2_mix_mp3, gender2 = self.mix(cid, vocal_path, acc_path, 2) - if err != gs_err_code_success: - return err, [] - - # 上传伴奏文件 - # err, acc_path_m4a = self.upload_acc(cid, acc_path) - # if err != gs_err_code_success: - # return err, [] - return gs_err_code_success, [type1_mix_mp3, type2_mix_mp3, str(gender), str(gender2)] + + rets = self.async_mix(cid, vocal_path, acc_path) + out_mix_mp3 = ["", ""] + out_vocal_path = ["", ""] + for ret in rets: + err, mix_mp3, vocal_path, tp = ret + if err != gs_err_code_success: + return err, [] + out_mix_mp3[tp - 1] = mix_mp3 + out_vocal_path[tp - 1] = vocal_path + + out_gender = [] + for i in range(len(out_vocal_path)): + gender, female_rate = self.voice_class.process_one(out_vocal_path[i]) + # 性别映射,由0:女 1:男 2:未知 映射为 1:男 2:女 3: 未知 + # GENDER_FEMALE = 0,GENDER_MALE = 1,GENDER_OTHER = 2 + mmap = [2, 1, 3] + gender = mmap[gender] + out_gender.append(str(gender)) + + # 音频1,音频2,性别1,性别2 + real_msg = [out_mix_mp3[0], out_mix_mp3[1], out_gender[0], out_gender[1]] + return gs_err_code_success, real_msg def download_and_transcode(self, url, local_path, local_path_wav): cmd = "wget {} -O {}".format(url, local_path) os.system(cmd) if not os.path.exists(local_path): return -1 cmd = "ffmpeg -i {} -ar 44100 -ac 2 -y {}".format(local_path, local_path_wav) os.system(cmd) if not os.path.exists(local_path_wav): return -2 return 0 def get_data_from_mysql(self, cid, work_dir): sql = "select starmaker_songid,task_url,complete_url,voice_url from starmaker_musicbook.silence where starmaker_songid={} order by task_id desc limit 1".format( cid) data = get_data_by_mysql(sql, banned_user_map) if len(data) == 0: return gs_err_code_silence_no_data song_id, task_url, complete_url, voice_url = data[0] if complete_url != "" and voice_url != "": """ 将人声与伴奏下载下来 """ ext = str(complete_url).split(".")[-1] acc_dst_file = os.path.join(work_dir, "acc.{}".format(ext)) acc_wav_dst_file = os.path.join(work_dir, "acc.wav") err = self.download_and_transcode(complete_url, acc_dst_file, acc_wav_dst_file) os.unlink(acc_dst_file) if err == -1: return gs_err_code_download_acc if err == -2: return gs_err_code_transcode_acc_v1 ext = str(voice_url).split(".")[-1] vocal_dst_file = os.path.join(work_dir, "vocal.{}".format(ext)) vocal_wav_dst_file = os.path.join(work_dir, "vocal.wav") err = self.download_and_transcode(voice_url, vocal_dst_file, vocal_wav_dst_file) os.unlink(vocal_dst_file) if err == -1: return gs_err_code_download_vocal if err == -2: return gs_err_code_transcode_vocal_v1 return gs_err_code_success return gs_err_code_silence_no_process def process_worker(self): logging.info("start process_worker .....") base_dir = "/tmp/tone_shift_one" if not os.path.exists(base_dir): os.makedirs(base_dir) while True: worker_st = time.time() cid, song_url = self.get_one_data_logic() # cid, song_url = self.get_url_by_id('611752105030548048') if cid is None: time.sleep(5) logging.info("get one data is None ...") continue work_dir = os.path.join(base_dir, str(cid)) if os.path.exists(work_dir): shutil.rmtree(work_dir) os.makedirs(work_dir) # 先查看消音数据库中是否已经完成了该项目,已经有的话,就直接下载即可 err = self.get_data_from_mysql(cid, work_dir) if err != gs_err_code_success: # 清空磁盘 shutil.rmtree(work_dir) os.makedirs(work_dir) err = self.pre_process(work_dir, song_url) if err != gs_err_code_success: self.update_state(str(cid), -err) continue st = time.time() err, data = self.process_one(str(cid), work_dir) logging.info("process_finish,{},{}".format(cid, time.time() - st)) if err == gs_err_code_success and len(data) != 0: sql = "update svc_queue_table set state={},update_time={},svc_url=\"{}\" where song_id = {}". \ format(gs_state_finish, int(time.time()), ",".join(data), str(cid)) banned_user_map['db'] = "av_db" update_db(sql, banned_user_map) else: self.update_state(str(cid), -err) shutil.rmtree(work_dir) logging.info("process_finish,{},{}".format(cid, time.time() - worker_st)) if __name__ == '__main__': ts = ToneShift() ts.process_worker()