diff --git a/AutoCoverTool/ref/so_vits_svc/inference/infer_tool.py b/AutoCoverTool/ref/so_vits_svc/inference/infer_tool.py index 9539a97..628cbdf 100644 --- a/AutoCoverTool/ref/so_vits_svc/inference/infer_tool.py +++ b/AutoCoverTool/ref/so_vits_svc/inference/infer_tool.py @@ -1,342 +1,342 @@ import hashlib import json import logging import os import time from pathlib import Path import librosa import maad import numpy as np # import onnxruntime import parselmouth import soundfile import torch import torchaudio from hubert import hubert_model import utils from models import SynthesizerTrn import copy logging.getLogger('matplotlib').setLevel(logging.WARNING) from mel_processing import spectrogram_torch, spec_to_mel_torch def get_spec(audio): audio_norm = audio print(audio_norm) spec = spectrogram_torch(audio_norm, 1280, 32000, 320, 1280, center=False) return spec def read_temp(file_name): if not os.path.exists(file_name): with open(file_name, "w") as f: f.write(json.dumps({"info": "temp_dict"})) return {} else: try: with open(file_name, "r") as f: data = f.read() data_dict = json.loads(data) if os.path.getsize(file_name) > 50 * 1024 * 1024: f_name = file_name.replace("\\", "/").split("/")[-1] print(f"clean {f_name}") for wav_hash in list(data_dict.keys()): if int(time.time()) - int(data_dict[wav_hash]["time"]) > 14 * 24 * 3600: del data_dict[wav_hash] except Exception as e: print(e) print(f"{file_name} error,auto rebuild file") data_dict = {"info": "temp_dict"} return data_dict def write_temp(file_name, data): with open(file_name, "w") as f: f.write(json.dumps(data)) def timeit(func): def run(*args, **kwargs): t = time.time() res = func(*args, **kwargs) print('executing \'%s\' costed %.3fs' % (func.__name__, time.time() - t)) return res return run def format_wav(audio_path): if Path(audio_path).suffix == '.wav': return raw_audio, raw_sample_rate = librosa.load(audio_path, mono=True, sr=None) soundfile.write(Path(audio_path).with_suffix(".wav"), raw_audio, raw_sample_rate) def get_end_file(dir_path, end): file_lists = [] for root, dirs, files in os.walk(dir_path): files = [f for f in files if f[0] != '.'] dirs[:] = [d for d in dirs if d[0] != '.'] for f_file in files: if f_file.endswith(end): file_lists.append(os.path.join(root, f_file).replace("\\", "/")) return file_lists def get_md5(content): return hashlib.new("md5", content).hexdigest() def resize2d_f0(x, target_len): source = np.array(x) source[source < 0.001] = np.nan target = np.interp(np.arange(0, len(source) * target_len, len(source)) / target_len, np.arange(0, len(source)), source) res = np.nan_to_num(target) return res def get_f0(x, p_len, f0_up_key=0): time_step = 160 / 16000 * 1000 f0_min = 50 f0_max = 1100 f0_mel_min = 1127 * np.log(1 + f0_min / 700) f0_mel_max = 1127 * np.log(1 + f0_max / 700) f0 = parselmouth.Sound(x, 16000).to_pitch_ac( time_step=time_step / 1000, voicing_threshold=0.6, pitch_floor=f0_min, pitch_ceiling=f0_max).selected_array['frequency'] if len(f0) > p_len: f0 = f0[:p_len] pad_size = (p_len - len(f0) + 1) // 2 if (pad_size > 0 or p_len - len(f0) - pad_size > 0): f0 = np.pad(f0, [[pad_size, p_len - len(f0) - pad_size]], mode='constant') f0 *= pow(2, f0_up_key / 12) f0_mel = 1127 * np.log(1 + f0 / 700) f0_mel[f0_mel > 0] = (f0_mel[f0_mel > 0] - f0_mel_min) * 254 / (f0_mel_max - f0_mel_min) + 1 f0_mel[f0_mel <= 1] = 1 f0_mel[f0_mel > 255] = 255 f0_coarse = np.rint(f0_mel).astype(np.int) return f0_coarse, f0 def clean_pitch(input_pitch): num_nan = np.sum(input_pitch == 1) if num_nan / len(input_pitch) > 0.9: input_pitch[input_pitch != 1] = 1 return input_pitch def plt_pitch(input_pitch): input_pitch = input_pitch.astype(float) input_pitch[input_pitch == 1] = np.nan return input_pitch def f0_to_pitch(ff): f0_pitch = 69 + 12 * np.log2(ff / 440) return f0_pitch def fill_a_to_b(a, b): if len(a) < len(b): for _ in range(0, len(b) - len(a)): a.append(a[0]) def mkdir(paths: list): for path in paths: if not os.path.exists(path): os.mkdir(path) class Svc(object): - def __init__(self, net_g_path, config_path, hubert_path="data/models/hubert-soft-0d54a1f4.pt", + def __init__(self, net_g_path, config_path, hubert_path="/data/prod/so_vits_models/models/hubert-soft-0d54a1f4.pt", onnx=False): self.onnx = onnx self.net_g_path = net_g_path self.hubert_path = hubert_path self.dev = torch.device("cuda" if torch.cuda.is_available() else "cpu") self.net_g_ms = None self.hps_ms = utils.get_hparams_from_file(config_path) self.target_sample = self.hps_ms.data.sampling_rate self.hop_size = self.hps_ms.data.hop_length self.speakers = {} for spk, sid in self.hps_ms.spk.items(): self.speakers[sid] = spk self.spk2id = self.hps_ms.spk # 加载hubert self.hubert_soft = hubert_model.hubert_soft(hubert_path) if torch.cuda.is_available(): self.hubert_soft = self.hubert_soft.cuda() self.load_model() def load_model(self): # 获取模型配置 if self.onnx: raise NotImplementedError # self.net_g_ms = SynthesizerTrnForONNX( # 178, # self.hps_ms.data.filter_length // 2 + 1, # self.hps_ms.train.segment_size // self.hps_ms.data.hop_length, # n_speakers=self.hps_ms.data.n_speakers, # **self.hps_ms.model) # _ = utils.load_checkpoint(self.net_g_path, self.net_g_ms, None) else: self.net_g_ms = SynthesizerTrn( self.hps_ms.data.filter_length // 2 + 1, self.hps_ms.train.segment_size // self.hps_ms.data.hop_length, **self.hps_ms.model) _ = utils.load_checkpoint(self.net_g_path, self.net_g_ms, None) if "half" in self.net_g_path and torch.cuda.is_available(): _ = self.net_g_ms.half().eval().to(self.dev) else: _ = self.net_g_ms.eval().to(self.dev) def get_units(self, source, sr): source = source.unsqueeze(0).to(self.dev) with torch.inference_mode(): start = time.time() units = self.hubert_soft.units(source) use_time = time.time() - start print("hubert use time:{}".format(use_time)) return units def get_unit_pitch(self, in_path, tran): source, sr = torchaudio.load(in_path) source_bak = copy.deepcopy(source) source = torchaudio.functional.resample(source, sr, 16000) if len(source.shape) == 2 and source.shape[1] >= 2: source = torch.mean(source, dim=0).unsqueeze(0) soft = self.get_units(source, sr).squeeze(0).cpu().numpy() f0_coarse, f0 = get_f0(source.cpu().numpy()[0], soft.shape[0] * 2, tran) return soft, f0, source_bak def infer(self, speaker_id, tran, raw_path, dev=False): if type(speaker_id) == str: speaker_id = self.spk2id[speaker_id] sid = torch.LongTensor([int(speaker_id)]).to(self.dev).unsqueeze(0) soft, pitch, source = self.get_unit_pitch(raw_path, tran) f0 = torch.FloatTensor(clean_pitch(pitch)).unsqueeze(0).to(self.dev) if "half" in self.net_g_path and torch.cuda.is_available(): stn_tst = torch.HalfTensor(soft) else: stn_tst = torch.FloatTensor(soft) # 提取幅度谱 # spec = get_spec(source).to(self.dev) with torch.no_grad(): x_tst = stn_tst.unsqueeze(0).to(self.dev) start = time.time() x_tst = torch.repeat_interleave(x_tst, repeats=2, dim=1).transpose(1, 2) audio = self.net_g_ms.infer(x_tst, f0=f0, g=sid)[0, 0].data.float() # audio = self.net_g_ms.infer_v1(x_tst, spec[:, :, :f0.size(-1)], f0=f0, g=sid)[0, 0].data.float() use_time = time.time() - start print("vits use time:{}".format(use_time)) return audio, audio.shape[-1] # class SvcONNXInferModel(object): # def __init__(self, hubert_onnx, vits_onnx, config_path): # self.config_path = config_path # self.vits_onnx = vits_onnx # self.hubert_onnx = hubert_onnx # self.hubert_onnx_session = onnxruntime.InferenceSession(hubert_onnx, providers=['CUDAExecutionProvider', ]) # self.inspect_onnx(self.hubert_onnx_session) # self.vits_onnx_session = onnxruntime.InferenceSession(vits_onnx, providers=['CUDAExecutionProvider', ]) # self.inspect_onnx(self.vits_onnx_session) # self.hps_ms = utils.get_hparams_from_file(self.config_path) # self.target_sample = self.hps_ms.data.sampling_rate # self.feature_input = FeatureInput(self.hps_ms.data.sampling_rate, self.hps_ms.data.hop_length) # # @staticmethod # def inspect_onnx(session): # for i in session.get_inputs(): # print("name:{}\tshape:{}\tdtype:{}".format(i.name, i.shape, i.type)) # for i in session.get_outputs(): # print("name:{}\tshape:{}\tdtype:{}".format(i.name, i.shape, i.type)) # # def infer(self, speaker_id, tran, raw_path): # sid = np.array([int(speaker_id)], dtype=np.int64) # soft, pitch = self.get_unit_pitch(raw_path, tran) # pitch = np.expand_dims(pitch, axis=0).astype(np.int64) # stn_tst = soft # x_tst = np.expand_dims(stn_tst, axis=0) # x_tst_lengths = np.array([stn_tst.shape[0]], dtype=np.int64) # # 使用ONNX Runtime进行推理 # start = time.time() # audio = self.vits_onnx_session.run(output_names=["audio"], # input_feed={ # "hidden_unit": x_tst, # "lengths": x_tst_lengths, # "pitch": pitch, # "sid": sid, # })[0][0, 0] # use_time = time.time() - start # print("vits_onnx_session.run time:{}".format(use_time)) # audio = torch.from_numpy(audio) # return audio, audio.shape[-1] # # def get_units(self, source, sr): # source = torchaudio.functional.resample(source, sr, 16000) # if len(source.shape) == 2 and source.shape[1] >= 2: # source = torch.mean(source, dim=0).unsqueeze(0) # source = source.unsqueeze(0) # # 使用ONNX Runtime进行推理 # start = time.time() # units = self.hubert_onnx_session.run(output_names=["embed"], # input_feed={"source": source.numpy()})[0] # use_time = time.time() - start # print("hubert_onnx_session.run time:{}".format(use_time)) # return units # # def transcribe(self, source, sr, length, transform): # feature_pit = self.feature_input.compute_f0(source, sr) # feature_pit = feature_pit * 2 ** (transform / 12) # feature_pit = resize2d_f0(feature_pit, length) # coarse_pit = self.feature_input.coarse_f0(feature_pit) # return coarse_pit # # def get_unit_pitch(self, in_path, tran): # source, sr = torchaudio.load(in_path) # soft = self.get_units(source, sr).squeeze(0) # input_pitch = self.transcribe(source.numpy()[0], sr, soft.shape[0], tran) # return soft, input_pitch class RealTimeVC: def __init__(self): self.last_chunk = None self.last_o = None self.chunk_len = 16000 # 区块长度 self.pre_len = 3840 # 交叉淡化长度,640的倍数 """输入输出都是1维numpy 音频波形数组""" def process(self, svc_model, speaker_id, f_pitch_change, input_wav_path): audio, sr = torchaudio.load(input_wav_path) audio = audio.cpu().numpy()[0] temp_wav = io.BytesIO() if self.last_chunk is None: input_wav_path.seek(0) audio, sr = svc_model.infer(speaker_id, f_pitch_change, input_wav_path) audio = audio.cpu().numpy() self.last_chunk = audio[-self.pre_len:] self.last_o = audio return audio[-self.chunk_len:] else: audio = np.concatenate([self.last_chunk, audio]) soundfile.write(temp_wav, audio, sr, format="wav") temp_wav.seek(0) audio, sr = svc_model.infer(speaker_id, f_pitch_change, temp_wav) audio = audio.cpu().numpy() ret = maad.util.crossfade(self.last_o, audio, self.pre_len) self.last_chunk = audio[-self.pre_len:] self.last_o = audio return ret[self.chunk_len:2 * self.chunk_len] diff --git a/AutoCoverTool/svc_inference/webui.py b/AutoCoverTool/svc_inference/webui.py index 0cb72e5..42d40fc 100644 --- a/AutoCoverTool/svc_inference/webui.py +++ b/AutoCoverTool/svc_inference/webui.py @@ -1,88 +1,101 @@ """ 构建唱歌音色转换网页(基于3.0) 要求: 1. 音频上传 2. 推理 3. 下载 """ import os import time import glob import shutil import librosa import soundfile import gradio as gr from online.common import update_db from ref.so_vits_svc.inference_main import * gs_tmp_dir = "/tmp/svc_inference" gs_model_dir = "/data/prod/so_vits_models/3.0" gs_test_wav_dir = "/data/prod/so_vits_models/test_svc_file/3.0" gs_config_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "config.json") def generate_svc_file(): """ :return: """ if not os.path.exists(gs_test_wav_dir): os.makedirs(gs_test_wav_dir) test_wav_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "../res/syz_test.wav") model_path_list = glob.glob(os.path.join(gs_model_dir, "*/*pth")) st = time.time() for idx, model_path in enumerate(model_path_list): model_name = model_path.strip().split("/")[-1].split(".pth", "") dst_path = os.path.join(gs_test_wav_dir, "{}.wav".format(model_name)) if not os.path.exists(dst_path): inf(model_path, gs_config_path, test_wav_path, dst_path, "prod") print("now_per={}/{}".format(idx, len(model_path_list), time.time() - st)) def update_state(gender, user_id): sql = "update av_db.av_svc_model set gender={} where user_id=\"{}\"".format(gender, user_id) update_db(sql) # 按钮控制 -def click(): +def click_male(user_id): + pass + + +def click_female(user_id): + pass + + +def click_delete(user_id): pass def main(): # header st = time.time() generate_svc_file() print("generate svc sp={}".format(time.time() - st)) app = gr.Blocks() with app: # 头部介绍 gr.Markdown(value=""" ### 人声质量评价 作者:starmaker音视频 """) # 列表展示 # 1. 每一行有音频,性别,删除等按钮 svc_files = glob.glob(os.path.join(gs_test_wav_dir, "*wav")) for svc_file in svc_files: + user_id = svc_file.split("/")[-1].replace(".wav", "") gr.Audio(source=svc_file) + male_gender_btn = gr.Button("male") female_gender_btn = gr.Button("female") del_btn = gr.Button("female") + male_gender_btn.click(click_male, inputs=[user_id]) + female_gender_btn.click(click_female, inputs=[user_id]) + del_btn.click(click_delete, inputs=[user_id]) refresh_btn = gr.Button("refresh_model_list") refresh_btn.click(fn=model_select, inputs=[], outputs=gs_model_list_dropdown) # 音频输入框 input_audio = gr.inputs.Audio(label="input") vc_transform = gr.Number(label="变调(整数,可以正负,半音数量,升高八度就是12)", value=0) gen_btn = gr.Button("generate", variant="primary") output_audio = gr.outputs.Audio(label="output", type='filepath') gen_btn.click(fn=svc, inputs=[input_audio, gs_model_list_dropdown, vc_transform], outputs=output_audio) # 本方法实现同一时刻只有一个程序在服务器端运行 app.queue(concurrency_count=1, max_size=2044).launch(server_name="0.0.0.0", inbrowser=True, quiet=True, server_port=7860) if __name__ == '__main__': main()