diff --git a/AutoCoverTool/online/tone_shift_one.py b/AutoCoverTool/online/tone_shift_one.py index 446be54..232516d 100644 --- a/AutoCoverTool/online/tone_shift_one.py +++ b/AutoCoverTool/online/tone_shift_one.py @@ -1,368 +1,369 @@ """ 变调的方式做处理 1. 下载 2. 分离 3. 针对于人声变调+2,伴奏+1 4. 合成 """ import os import json import shutil import librosa import logging import numpy as np import multiprocessing as mp from ref.music_remover.separate_interface import SeparateInterface from online.inference_worker import upload_file2cos, gs_state_use, gs_state_finish, gs_state_default from online.common import * from ref.online.voice_class_online import VoiceClass logging.basicConfig(filename='/tmp/tone_shift_one.log', level=logging.INFO) gs_tone_shift_exe = "/data/gpu_env_common/res/av_svc/bin/tone_shift_exe" gs_simple_mixer_path = "/data/gpu_env_common/res/av_svc/bin/simple_mixer" gs_err_code_success = 0 gs_err_code_tone_shift = 1 gs_err_code_mix = 2 gs_err_code_transcode = 3 gs_err_code_upload = 4 gs_err_code_download = 5 gs_err_code_trans_to_mp3 = 6 gs_err_code_separate = 7 gs_err_code_duration_too_long = 8 gs_err_code_duration_no_vocal = 9 gs_err_code_duration_err = 10 gs_err_code_transcode_acc = 11 gs_err_code_upload_acc = 12 gs_err_code_download_acc = 13 gs_err_code_download_vocal = 14 gs_err_code_transcode_acc_v1 = 15 gs_err_code_transcode_vocal_v1 = 16 gs_err_code_silence_no_data = 17 gs_err_code_silence_no_process = 18 def post_process_err_callback(msg): print("ERROR|post_process|task_error_callback:", msg) def exec_cmd(cmd): r = os.popen(cmd) text = r.read() r.close() return text def get_d(audio_path): cmd = "ffprobe -v quiet -print_format json -show_format -show_streams {}".format(audio_path) data = exec_cmd(cmd) data = json.loads(data) # 返回秒 if 'format' in data.keys() and 'duration' in data['format']: return float(data["format"]["duration"]) return -1 def get_mean_power(audio_path): sr = 44100 audio, sr = librosa.load(audio_path, sr=sr, mono=True) mm = np.mean(np.abs(audio)) return mm def tone_shift_one(in_file, dst_file, pitch): cmd = "{} {} {} {}".format(gs_tone_shift_exe, in_file, dst_file, pitch) os.system(cmd) return os.path.exists(dst_file) def mix(cid, vocal_path, acc_path, tp): if tp == 1: vocal_pitch = 2 acc_pitch = 0 else: vocal_pitch = -2 acc_pitch = 0 vocal_path_2 = vocal_path.replace(".wav", "_{}.wav".format(vocal_pitch)) acc_path_2 = acc_path.replace(".wav", "_{}.wav".format(acc_pitch)) err = tone_shift_one(vocal_path, vocal_path_2, vocal_pitch) if not err: return gs_err_code_tone_shift, None, None, tp err = tone_shift_one(acc_path, acc_path_2, acc_pitch) if not err: return gs_err_code_tone_shift, None, None, tp base_dir = os.path.dirname(vocal_path) mix_path = "{}/mix_{}_{}.wav".format(base_dir, vocal_pitch, acc_pitch) cmd = "{} {} {} {}".format(gs_simple_mixer_path, vocal_path_2, acc_path_2, mix_path) print("exec_cmd={}".format(cmd)) os.system(cmd) if not os.path.exists(mix_path): return gs_err_code_mix, None, None, tp # 转码 mix_path_mp3 = mix_path.replace(".wav", ".mp4") cmd = "ffmpeg -i {} -b:a 128k -c:a aac -ar 44100 -ac 2 -y {} -loglevel fatal".format(mix_path, mix_path_mp3) os.system(cmd) if not os.path.exists(mix_path_mp3): return gs_err_code_transcode, None, None, tp # 上传到cos mix_name = os.path.basename(mix_path_mp3) key = "av_res/svc_res_tone_shift/{}/{}".format(str(cid), mix_name) if not upload_file2cos(key, mix_path_mp3): return gs_err_code_upload, None, None return gs_err_code_success, key, vocal_path_2, tp class ToneShift: def __init__(self): self.separate_inst = SeparateInterface() model_path = "/data/gpu_env_common/res/av_svc/models" music_voice_pure_model = os.path.join(model_path, "voice_005_rec_v5.pth") music_voice_no_pure_model = os.path.join(model_path, "voice_10_v5.pth") gender_pure_model = os.path.join(model_path, "gender_8k_ratev5_v6_adam.pth") gender_no_pure_model = os.path.join(model_path, "gender_8k_v6_adam.pth") self.voice_class = VoiceClass(music_voice_pure_model, music_voice_no_pure_model, gender_pure_model, gender_no_pure_model) def update_state(self, song_id, state): sql = "update svc_queue_table set state={},update_time={} where song_id = {}". \ format(state, int(time.time()), song_id) banned_user_map['db'] = "av_db" update_db(sql, banned_user_map) def get_url_by_id(self, song_id): sql = "select song_id, url from svc_queue_table where song_id={}".format(song_id) banned_user_map["db"] = "av_db" data = get_data_by_mysql(sql) if len(data) == 0: return None, None return str(data[0][0]), data[0][1] def get_one_data_logic(self): """ 按照5,4,3的优先级进行获取 :return: """ song_src_arr = [5, 4, 3] for song_src in song_src_arr: song_id, song_url = self.get_one_data(song_src=song_src) if song_id is not None: return song_id, song_url return None, None def get_one_data(self, song_src=3): sql = "select song_id, url from svc_queue_table where state = 0 and song_src={} order by create_time asc limit 1".format( song_src) banned_user_map["db"] = "av_db" data = get_data_by_mysql(sql, banned_user_map) if len(data) == 0: return None, None song_id, song_url = data[0] if song_id != "": self.update_state(song_id, gs_state_use) return str(song_id), song_url def pre_process(self, work_dir, song_url): """ 创建文件夹,下载数据 :return: """ if "?sign=" in song_url: return gs_err_code_download ext = str(song_url).split(".")[-1] dst_file = "{}/src_origin.{}".format(work_dir, ext) cmd = "wget {} -O {}".format(song_url, dst_file) os.system(cmd) if not os.path.exists(dst_file): return gs_err_code_download duration = get_d(dst_file) if duration < 0: return gs_err_code_duration_err print("Duration:", dst_file, duration) if duration > 20 * 60: return gs_err_code_duration_too_long dst_mp3_file = "{}/src.wav".format(work_dir) cmd = "ffmpeg -i {} -ar 44100 -ac 2 -y {} ".format(dst_file, dst_mp3_file) os.system(cmd) if not os.path.exists(dst_mp3_file): return gs_err_code_trans_to_mp3 return gs_err_code_success def upload_acc(self, cid, acc_path): # 转码 mix_path_aac = acc_path.replace(".wav", ".m4a") cmd = "ffmpeg -i {} -b:a 128k -c:a aac -ar 44100 -ac 2 -y {} -loglevel fatal".format(acc_path, mix_path_aac) os.system(cmd) if not os.path.exists(mix_path_aac): return gs_err_code_transcode_acc, None # 上传 mix_name = os.path.basename(mix_path_aac) key = "av_res/svc_res_tone_shift/{}/{}".format(str(cid), mix_name) if not upload_file2cos(key, mix_path_aac): return gs_err_code_upload_acc, None return gs_err_code_success, key def async_mix(self, cid, vocal_path, acc_path): pool = mp.Pool(processes=2) res = [] for i in range(1, 3): ret = pool.apply_async(mix, args=(cid, vocal_path, acc_path, i), error_callback=post_process_err_callback) res.append(ret) pool.close() pool.join() real_res = [] for i in res: real_res.append(i.get(timeout=10 * 60)) return real_res def process_one(self, cid, work_dir): """ :param cid: :param work_dir: :return: """ src_mp3 = os.path.join(work_dir, "src.wav") vocal_path = os.path.join(work_dir, "vocal.wav") acc_path = os.path.join(work_dir, "acc.wav") if not (os.path.exists(vocal_path) and os.path.exists(acc_path)): if not self.separate_inst.process(cid, src_mp3, vocal_path, acc_path): return gs_err_code_separate, [] if not os.path.exists(vocal_path) or not os.path.exists(acc_path): return gs_err_code_separate, [] # 当人声的平均能量小于一定值时,则认为无人声(0.01是经验值判定,样本分析来看) # 无人声的样本[0.0056, 0.0003], 有人声的样本(目前最小)[0.046, 0.049] print("power:{},{}".format(cid, get_mean_power(vocal_path))) if get_mean_power(vocal_path) < 0.02: return gs_err_code_duration_no_vocal, [] rets = self.async_mix(cid, vocal_path, acc_path) out_mix_mp3 = ["", ""] out_vocal_path = ["", ""] for ret in rets: err, mix_mp3, vocal_path, tp = ret if err != gs_err_code_success: return err, [] out_mix_mp3[tp - 1] = mix_mp3 out_vocal_path[tp - 1] = vocal_path out_gender = [] for i in range(len(out_vocal_path)): gender, female_rate = self.voice_class.process_one(out_vocal_path[i]) # 性别映射,由0:女 1:男 2:未知 映射为 1:男 2:女 3: 未知 # GENDER_FEMALE = 0,GENDER_MALE = 1,GENDER_OTHER = 2 mmap = [2, 1, 3] gender = mmap[gender] out_gender.append(str(gender)) # 音频1,音频2,性别1,性别2 real_msg = [out_mix_mp3[0], out_mix_mp3[1], out_gender[0], out_gender[1]] return gs_err_code_success, real_msg def download_and_transcode(self, url, local_path, local_path_wav): cmd = "wget {} -O {}".format(url, local_path) os.system(cmd) if not os.path.exists(local_path): return -1 cmd = "ffmpeg -i {} -ar 44100 -ac 2 -y {}".format(local_path, local_path_wav) os.system(cmd) if not os.path.exists(local_path_wav): return -2 return 0 def get_data_from_mysql(self, cid, work_dir): sql = "select starmaker_songid,task_url,complete_url,voice_url from starmaker_musicbook.silence where starmaker_songid={} order by task_id desc limit 1".format( cid) data = get_data_by_mysql(sql, banned_user_map) if len(data) == 0: return gs_err_code_silence_no_data song_id, task_url, complete_url, voice_url = data[0] if complete_url != "" and voice_url != "": """ 将人声与伴奏下载下来 """ ext = str(complete_url).split(".")[-1] acc_dst_file = os.path.join(work_dir, "acc.{}".format(ext)) acc_wav_dst_file = os.path.join(work_dir, "acc.wav") err = self.download_and_transcode(complete_url, acc_dst_file, acc_wav_dst_file) os.unlink(acc_dst_file) if err == -1: return gs_err_code_download_acc if err == -2: return gs_err_code_transcode_acc_v1 ext = str(voice_url).split(".")[-1] vocal_dst_file = os.path.join(work_dir, "vocal.{}".format(ext)) vocal_wav_dst_file = os.path.join(work_dir, "vocal.wav") err = self.download_and_transcode(voice_url, vocal_dst_file, vocal_wav_dst_file) os.unlink(vocal_dst_file) if err == -1: return gs_err_code_download_vocal if err == -2: return gs_err_code_transcode_vocal_v1 return gs_err_code_success return gs_err_code_silence_no_process def process_worker(self): logging.info("start process_worker .....") base_dir = "/tmp/tone_shift_one" if not os.path.exists(base_dir): os.makedirs(base_dir) while True: worker_st = time.time() cid, song_url = self.get_one_data_logic() - # cid, song_url = self.get_url_by_id('611752105030548048') + # cid, song_url = self.get_url_by_id('611752105030838774') if cid is None: time.sleep(5) logging.info("get one data is None ...") continue work_dir = os.path.join(base_dir, str(cid)) if os.path.exists(work_dir): shutil.rmtree(work_dir) os.makedirs(work_dir) # 先查看消音数据库中是否已经完成了该项目,已经有的话,就直接下载即可 err = self.get_data_from_mysql(cid, work_dir) if err != gs_err_code_success: # 清空磁盘 shutil.rmtree(work_dir) os.makedirs(work_dir) err = self.pre_process(work_dir, song_url) if err != gs_err_code_success: self.update_state(str(cid), -err) continue - st = time.time() - err, data = self.process_one(str(cid), work_dir) - logging.info("process_finish,{},{}".format(cid, time.time() - st)) - if err == gs_err_code_success and len(data) != 0: - sql = "update svc_queue_table set state={},update_time={},svc_url=\"{}\" where song_id = {}". \ - format(gs_state_finish, int(time.time()), ",".join(data), str(cid)) - banned_user_map['db'] = "av_db" - update_db(sql, banned_user_map) - else: - self.update_state(str(cid), -err) - shutil.rmtree(work_dir) - logging.info("process_finish,{},{}".format(cid, time.time() - worker_st)) + st = time.time() + err, data = self.process_one(str(cid), work_dir) + logging.info("process_finish,{},{}".format(cid, time.time() - st)) + if err == gs_err_code_success and len(data) != 0: + sql = "update svc_queue_table set state={},update_time={},svc_url=\"{}\" where song_id = {}". \ + format(gs_state_finish, int(time.time()), ",".join(data), str(cid)) + banned_user_map['db'] = "av_db" + update_db(sql, banned_user_map) + else: + self.update_state(str(cid), -err) + shutil.rmtree(work_dir) + logging.info("process_finish,{},{}".format(cid, time.time() - worker_st)) + if __name__ == '__main__': ts = ToneShift() ts.process_worker()