diff --git a/AIMeiSheng/docker_demo/offline_server.py b/AIMeiSheng/docker_demo/offline_server.py index 48205ea..0fc025e 100644 --- a/AIMeiSheng/docker_demo/offline_server.py +++ b/AIMeiSheng/docker_demo/offline_server.py @@ -1,153 +1,159 @@ # -*- coding: UTF-8 -*- """ 离线处理: 使用redis进行交互,从redis中获取数据资源,在将结果写入到redis """ import os import sys import time import json import socket import hashlib from redis_helper import RedisHelper from cos_helper import CosHelper from common import * import logging from svc_online import GSWorkerAttr, SVCOnline, volume_adjustment, svc_offline_logger sys.path.append(os.path.dirname(__file__)) sys.path.append(os.path.join(os.path.dirname(__file__), "../")) def download_data(worker_attr): if os.path.exists(worker_attr.vocal_path): os.unlink(worker_attr.vocal_path) st = time.time() if not download2disk(worker_attr.vocal_url, worker_attr.vocal_path): return gs_err_code_download_vocal svc_offline_logger.info(f"download vocal_url={worker_attr.vocal_url} sp = {time.time() - st}") # download svc_source_url if not os.path.exists(worker_attr.female_svc_source_path): st = time.time() if not download2disk(worker_attr.female_svc_source_url, worker_attr.female_svc_source_path): return gs_err_code_download_svc_url svc_offline_logger.info(f"download female_url={worker_attr.female_svc_source_url} sp = {time.time() - st}") # download svc_source_url if not os.path.exists(worker_attr.male_svc_source_path): st = time.time() if not download2disk(worker_attr.male_svc_source_url, worker_attr.male_svc_source_path): return gs_err_code_download_svc_url svc_offline_logger.info(f"download male_url={worker_attr.male_svc_source_url} sp = {time.time() - st}") return gs_err_code_success def transcode(wav_path, dst_path): st = time.time() cmd = f"ffmpeg -i {wav_path} -ar 44100 -ac 1 -b:a 64k -y {dst_path} -loglevel fatal" exec_cmd(cmd) svc_offline_logger.info(f"transcode cmd={cmd}, sp = {time.time() - st}") return os.path.exists(dst_path) class OfflineServer: def __init__(self, redis_conf, server_conf, update_redis=False): st = time.time() self.redis_helper = RedisHelper(redis_conf) self.cos_helper = CosHelper() self.svc_online = SVCOnline() self.server_conf = server_conf self.distinct_key = server_conf["ai_meisheng_key_prefix"] self.update_redis = update_redis svc_offline_logger.info(f"config={redis_conf}---server_conf={self.server_conf}") svc_offline_logger.info(f"offline init finish sp={time.time() - st}") def exists(self): return self.redis_helper.exists(self.distinct_key) def update_result(self, errcode, schedule, gender, target_song_url): msg = { "status": errcode, "schedule": schedule, "gender": gender, "target_song_url": target_song_url, } # 结果保存15min if self.update_redis: self.redis_helper.set(self.distinct_key, json.dumps(msg)) self.redis_helper.expire(self.distinct_key, 60 * 10) def process_one(self, worker_attr): self.distinct_key = self.server_conf["ai_meisheng_key_prefix"] + worker_attr.distinct_id svc_offline_logger.info(f"{worker_attr.log_info_name()}, start download ...") err = download_data(worker_attr) if err != gs_err_code_success: self.update_result(err, 100, "unknown", worker_attr.target_url) return err, None, None self.update_result(err, 35, "unknown", worker_attr.target_url) + if not transcode(worker_attr.vocal_path, worker_attr.vocal_tmp_path): + return gs_err_code_transcode, None, None + + os.unlink(worker_attr.vocal_path) + os.rename(worker_attr.vocal_tmp_path, worker_attr.vocal_path) + svc_offline_logger.info(f"{worker_attr.log_info_name()}, start process ...") gender, err_code = self.svc_online.process(worker_attr) if err_code == gs_err_code_target_silence : #unvoice err return gs_err_code_target_silence, None, None if not os.path.exists(worker_attr.target_wav_path): self.update_result(gs_err_code_svc_process, 100, gender, worker_attr.target_url) return gs_err_code_svc_process, None, None self.update_result(err, 85, gender, worker_attr.target_url) # 音量拉伸到指定响度 svc_offline_logger.info(f"{worker_attr.log_info_name()}, start volume_adjustment ...") volume_adjustment(worker_attr.target_wav_path, worker_attr.target_loudness, worker_attr.target_wav_ad_path) if not os.path.exists(worker_attr.target_wav_ad_path): self.update_result(gs_err_code_volume_adjust, 100, gender, worker_attr.target_url) return gs_err_code_volume_adjust, None, None self.update_result(err, 90, gender, worker_attr.target_url) # transcode svc_offline_logger.info(f"{worker_attr.log_info_name()}, start transcode ...") if not transcode(worker_attr.target_wav_ad_path, worker_attr.target_path): self.update_result(gs_err_code_transcode, 100, gender, worker_attr.target_url) return gs_err_code_transcode, None, None self.update_result(err, 95, gender, worker_attr.target_url) # upload svc_offline_logger.info(f"{worker_attr.log_info_name()}, start upload_file2cos ...") st = time.time() if not self.cos_helper.upload_by_url(worker_attr.target_path, worker_attr.target_url): self.update_result(gs_err_code_upload, 100, gender, worker_attr.target_url) return gs_err_code_upload, None, None self.update_result(gs_err_code_success, 100, gender, worker_attr.target_url) svc_offline_logger.info(f"{worker_attr.log_info_name()} upload {worker_attr.target_url} sp = {time.time() - st}") return gs_err_code_success, worker_attr.target_url, gender def process(self): while True: data = self.redis_helper.rpop(self.server_conf["producer"]) if data is None: time.sleep(1) continue data = json.loads(data) if not check_input(data): svc_offline_logger.error(f"input data error={data}") continue worker_attr = GSWorkerAttr(data) self.distinct_key = self.server_conf["ai_meisheng_key_prefix"] + worker_attr.distinct_id if not self.exists(): svc_offline_logger.warning(f"input {data}, timeout abandon ....") worker_attr.rm_cache() continue st = time.time() errcode, target_path, gender = self.process_one(worker_attr) self.update_result(errcode, 100, gender, target_path) svc_offline_logger.info(f"{worker_attr.log_info_name()} finish errcode={errcode} sp = {time.time() - st}") worker_attr.rm_cache() if __name__ == '__main__': offline_server = OfflineServer(gs_redis_conf, gs_server_redis_conf, True) offline_server.process() diff --git a/AIMeiSheng/docker_demo/svc_online.py b/AIMeiSheng/docker_demo/svc_online.py index e910f5f..95ab3ff 100644 --- a/AIMeiSheng/docker_demo/svc_online.py +++ b/AIMeiSheng/docker_demo/svc_online.py @@ -1,188 +1,189 @@ # -*- coding: UTF-8 -*- """ SVC的核心处理逻辑 """ import os import time import socket import shutil import hashlib from AIMeiSheng.meisheng_svc_final import load_model, process_svc_online from AIMeiSheng.meisheng_env_preparex import meisheng_env_prepare from AIMeiSheng.voice_classification.online.voice_class_online_fang import VoiceClass, download_volume_balanced from AIMeiSheng.docker_demo.common import * import logging hostname = socket.gethostname() log_file_name = f"{os.path.dirname(os.path.abspath(__file__))}/av_meisheng_{hostname}.log" # 设置logger svc_offline_logger = logging.getLogger("svc_offline") file_handler = logging.FileHandler(log_file_name) file_handler.setLevel(logging.INFO) formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s', datefmt='%Y-%m-%d %I:%M:%S') file_handler.setFormatter(formatter) if gs_prod: svc_offline_logger.addHandler(file_handler) if os.path.exists(gs_tmp_dir): shutil.rmtree(gs_tmp_dir) os.makedirs(gs_model_dir, exist_ok=True) os.makedirs(gs_resource_cache_dir, exist_ok=True) # 预设参数 gs_gender_models_url = "https://av-audit-sync-sg-1256122840.cos.ap-singapore.myqcloud.com/hub/voice_classification/models.zip" gs_volume_bin_url = "https://av-audit-sync-sg-1256122840.cos.ap-singapore.myqcloud.com/dataset/AIMeiSheng/ebur128_tool" class GSWorkerAttr: def __init__(self, input_data): # 取出输入资源 vocal_url = input_data["record_song_url"] target_url = input_data["target_url"] start = input_data["start"] # 单位是ms end = input_data["end"] # 单位是ms vocal_loudness = input_data["vocal_loudness"] female_recording_url = input_data["female_recording_url"] male_recording_url = input_data["male_recording_url"] self.distinct_id = hashlib.md5(vocal_url.encode()).hexdigest() self.tmp_dir = os.path.join(gs_tmp_dir, self.distinct_id) if os.path.exists(self.tmp_dir): shutil.rmtree(self.tmp_dir) os.makedirs(self.tmp_dir) self.vocal_url = vocal_url self.target_url = target_url ext = vocal_url.split(".")[-1] self.vocal_path = os.path.join(self.tmp_dir, self.distinct_id + f"_in.{ext}") + self.vocal_tmp_path = os.path.join(self.tmp_dir, self.distinct_id + f"_tmp.{ext}") self.target_wav_path = os.path.join(self.tmp_dir, self.distinct_id + "_out.wav") self.target_wav_ad_path = os.path.join(self.tmp_dir, self.distinct_id + "_out_ad.wav") self.target_path = os.path.join(self.tmp_dir, self.distinct_id + "_out.m4a") self.female_svc_source_url = female_recording_url self.male_svc_source_url = male_recording_url ext = female_recording_url.split(".")[-1] self.female_svc_source_path = os.path.join(gs_resource_cache_dir, hashlib.md5(female_recording_url.encode()).hexdigest() + "." + ext) ext = male_recording_url.split(".")[-1] self.male_svc_source_path = os.path.join(gs_resource_cache_dir, hashlib.md5(male_recording_url.encode()).hexdigest() + "." + ext) self.st_tm = start self.ed_tm = end self.target_loudness = vocal_loudness def log_info_name(self): return f"d_id={self.distinct_id}, vocal_url={self.vocal_url}" def rm_cache(self): if os.path.exists(self.tmp_dir): shutil.rmtree(self.tmp_dir) def init_gender_model(): """ 下载模型 :return: """ dst_model_dir = os.path.join(gs_model_dir, "voice_classification") if not os.path.exists(dst_model_dir): dst_zip_path = os.path.join(gs_model_dir, "models.zip") if not download2disk(gs_gender_models_url, dst_zip_path): svc_offline_logger.fatal(f"download gender_model err={gs_gender_models_url}") cmd = f"cd {gs_model_dir}; unzip {dst_zip_path}; mv models voice_classification; rm -f {dst_zip_path}" os.system(cmd) if not os.path.exists(dst_model_dir): svc_offline_logger.fatal(f"unzip {dst_zip_path} err") music_voice_pure_model = os.path.join(dst_model_dir, "voice_005_rec_v5.pth") music_voice_no_pure_model = os.path.join(dst_model_dir, "voice_10_v5.pth") gender_pure_model = os.path.join(dst_model_dir, "gender_8k_ratev5_v6_adam.pth") gender_no_pure_model = os.path.join(dst_model_dir, "gender_8k_v6_adam.pth") vc = VoiceClass(music_voice_pure_model, music_voice_no_pure_model, gender_pure_model, gender_no_pure_model) return vc def init_svc_model(): meisheng_env_prepare(logging, gs_model_dir) embed_model, hubert_model = load_model() return embed_model, hubert_model def download_volume_adjustment(): """ 下载音量调整工具 :return: """ volume_bin_path = os.path.join(gs_model_dir, "ebur128_tool") if not os.path.exists(volume_bin_path): if not download2disk(gs_volume_bin_url, volume_bin_path): svc_offline_logger.fatal(f"download volume_bin err={gs_volume_bin_url}") os.system(f"chmod +x {volume_bin_path}") def volume_adjustment(wav_path, target_loudness, out_path): """ 音量调整 :param wav_path: :param target_loudness: :param out_path: :return: """ volume_bin_path = os.path.join(gs_model_dir, "ebur128_tool") cmd = f"{volume_bin_path} {wav_path} {target_loudness} {out_path}" os.system(cmd) class SVCOnline: def __init__(self): st = time.time() self.gender_model = init_gender_model() self.embed_model, self.hubert_model = init_svc_model() download_volume_adjustment() download_volume_balanced() svc_offline_logger.info(f"svc init finished, sp = {time.time() - st}") def gender_process(self, worker_attr): st = time.time() gender, female_rate, is_pure = self.gender_model.process(worker_attr.vocal_path) svc_offline_logger.info( f"{worker_attr.vocal_url}, gender={gender}, female_rate={female_rate}, is_pure={is_pure}, " f"gender_process sp = {time.time() - st}") if gender == 0: gender = 'female' elif gender == 1: gender = 'male' elif female_rate > 0.5: gender = 'female' else: gender = 'male' svc_offline_logger.info(f"{worker_attr.vocal_url}, modified gender={gender}") # err = gs_err_code_success # if female_rate == -1: # err = gs_err_code_target_silence return gender, gs_err_code_success def process(self, worker_attr): gender, err = self.gender_process(worker_attr) if err != gs_err_code_success: return gender, err song_path = worker_attr.female_svc_source_path if gender == "male": song_path = worker_attr.male_svc_source_path params = {'gender': gender, 'tst': worker_attr.st_tm, "tnd": worker_attr.ed_tm, 'delay': 0, 'song_path': None} st = time.time() err_code = process_svc_online(song_path, worker_attr.vocal_path, worker_attr.target_wav_path, self.embed_model, self.hubert_model, params) svc_offline_logger.info(f"{worker_attr.vocal_url}, err_code={err_code} process svc sp = {time.time() - st}") return gender, err_code