Page MenuHomePhabricator

No OneTemporary

diff --git a/AutoCoverTool/ref/so_vits_svc/inference/infer_tool.py b/AutoCoverTool/ref/so_vits_svc/inference/infer_tool.py
index 9539a97..628cbdf 100644
--- a/AutoCoverTool/ref/so_vits_svc/inference/infer_tool.py
+++ b/AutoCoverTool/ref/so_vits_svc/inference/infer_tool.py
@@ -1,342 +1,342 @@
import hashlib
import json
import logging
import os
import time
from pathlib import Path
import librosa
import maad
import numpy as np
# import onnxruntime
import parselmouth
import soundfile
import torch
import torchaudio
from hubert import hubert_model
import utils
from models import SynthesizerTrn
import copy
logging.getLogger('matplotlib').setLevel(logging.WARNING)
from mel_processing import spectrogram_torch, spec_to_mel_torch
def get_spec(audio):
audio_norm = audio
print(audio_norm)
spec = spectrogram_torch(audio_norm, 1280, 32000, 320, 1280, center=False)
return spec
def read_temp(file_name):
if not os.path.exists(file_name):
with open(file_name, "w") as f:
f.write(json.dumps({"info": "temp_dict"}))
return {}
else:
try:
with open(file_name, "r") as f:
data = f.read()
data_dict = json.loads(data)
if os.path.getsize(file_name) > 50 * 1024 * 1024:
f_name = file_name.replace("\\", "/").split("/")[-1]
print(f"clean {f_name}")
for wav_hash in list(data_dict.keys()):
if int(time.time()) - int(data_dict[wav_hash]["time"]) > 14 * 24 * 3600:
del data_dict[wav_hash]
except Exception as e:
print(e)
print(f"{file_name} error,auto rebuild file")
data_dict = {"info": "temp_dict"}
return data_dict
def write_temp(file_name, data):
with open(file_name, "w") as f:
f.write(json.dumps(data))
def timeit(func):
def run(*args, **kwargs):
t = time.time()
res = func(*args, **kwargs)
print('executing \'%s\' costed %.3fs' % (func.__name__, time.time() - t))
return res
return run
def format_wav(audio_path):
if Path(audio_path).suffix == '.wav':
return
raw_audio, raw_sample_rate = librosa.load(audio_path, mono=True, sr=None)
soundfile.write(Path(audio_path).with_suffix(".wav"), raw_audio, raw_sample_rate)
def get_end_file(dir_path, end):
file_lists = []
for root, dirs, files in os.walk(dir_path):
files = [f for f in files if f[0] != '.']
dirs[:] = [d for d in dirs if d[0] != '.']
for f_file in files:
if f_file.endswith(end):
file_lists.append(os.path.join(root, f_file).replace("\\", "/"))
return file_lists
def get_md5(content):
return hashlib.new("md5", content).hexdigest()
def resize2d_f0(x, target_len):
source = np.array(x)
source[source < 0.001] = np.nan
target = np.interp(np.arange(0, len(source) * target_len, len(source)) / target_len, np.arange(0, len(source)),
source)
res = np.nan_to_num(target)
return res
def get_f0(x, p_len, f0_up_key=0):
time_step = 160 / 16000 * 1000
f0_min = 50
f0_max = 1100
f0_mel_min = 1127 * np.log(1 + f0_min / 700)
f0_mel_max = 1127 * np.log(1 + f0_max / 700)
f0 = parselmouth.Sound(x, 16000).to_pitch_ac(
time_step=time_step / 1000, voicing_threshold=0.6,
pitch_floor=f0_min, pitch_ceiling=f0_max).selected_array['frequency']
if len(f0) > p_len:
f0 = f0[:p_len]
pad_size = (p_len - len(f0) + 1) // 2
if (pad_size > 0 or p_len - len(f0) - pad_size > 0):
f0 = np.pad(f0, [[pad_size, p_len - len(f0) - pad_size]], mode='constant')
f0 *= pow(2, f0_up_key / 12)
f0_mel = 1127 * np.log(1 + f0 / 700)
f0_mel[f0_mel > 0] = (f0_mel[f0_mel > 0] - f0_mel_min) * 254 / (f0_mel_max - f0_mel_min) + 1
f0_mel[f0_mel <= 1] = 1
f0_mel[f0_mel > 255] = 255
f0_coarse = np.rint(f0_mel).astype(np.int)
return f0_coarse, f0
def clean_pitch(input_pitch):
num_nan = np.sum(input_pitch == 1)
if num_nan / len(input_pitch) > 0.9:
input_pitch[input_pitch != 1] = 1
return input_pitch
def plt_pitch(input_pitch):
input_pitch = input_pitch.astype(float)
input_pitch[input_pitch == 1] = np.nan
return input_pitch
def f0_to_pitch(ff):
f0_pitch = 69 + 12 * np.log2(ff / 440)
return f0_pitch
def fill_a_to_b(a, b):
if len(a) < len(b):
for _ in range(0, len(b) - len(a)):
a.append(a[0])
def mkdir(paths: list):
for path in paths:
if not os.path.exists(path):
os.mkdir(path)
class Svc(object):
- def __init__(self, net_g_path, config_path, hubert_path="data/models/hubert-soft-0d54a1f4.pt",
+ def __init__(self, net_g_path, config_path, hubert_path="/data/prod/so_vits_models/models/hubert-soft-0d54a1f4.pt",
onnx=False):
self.onnx = onnx
self.net_g_path = net_g_path
self.hubert_path = hubert_path
self.dev = torch.device("cuda" if torch.cuda.is_available() else "cpu")
self.net_g_ms = None
self.hps_ms = utils.get_hparams_from_file(config_path)
self.target_sample = self.hps_ms.data.sampling_rate
self.hop_size = self.hps_ms.data.hop_length
self.speakers = {}
for spk, sid in self.hps_ms.spk.items():
self.speakers[sid] = spk
self.spk2id = self.hps_ms.spk
# 加载hubert
self.hubert_soft = hubert_model.hubert_soft(hubert_path)
if torch.cuda.is_available():
self.hubert_soft = self.hubert_soft.cuda()
self.load_model()
def load_model(self):
# 获取模型配置
if self.onnx:
raise NotImplementedError
# self.net_g_ms = SynthesizerTrnForONNX(
# 178,
# self.hps_ms.data.filter_length // 2 + 1,
# self.hps_ms.train.segment_size // self.hps_ms.data.hop_length,
# n_speakers=self.hps_ms.data.n_speakers,
# **self.hps_ms.model)
# _ = utils.load_checkpoint(self.net_g_path, self.net_g_ms, None)
else:
self.net_g_ms = SynthesizerTrn(
self.hps_ms.data.filter_length // 2 + 1,
self.hps_ms.train.segment_size // self.hps_ms.data.hop_length,
**self.hps_ms.model)
_ = utils.load_checkpoint(self.net_g_path, self.net_g_ms, None)
if "half" in self.net_g_path and torch.cuda.is_available():
_ = self.net_g_ms.half().eval().to(self.dev)
else:
_ = self.net_g_ms.eval().to(self.dev)
def get_units(self, source, sr):
source = source.unsqueeze(0).to(self.dev)
with torch.inference_mode():
start = time.time()
units = self.hubert_soft.units(source)
use_time = time.time() - start
print("hubert use time:{}".format(use_time))
return units
def get_unit_pitch(self, in_path, tran):
source, sr = torchaudio.load(in_path)
source_bak = copy.deepcopy(source)
source = torchaudio.functional.resample(source, sr, 16000)
if len(source.shape) == 2 and source.shape[1] >= 2:
source = torch.mean(source, dim=0).unsqueeze(0)
soft = self.get_units(source, sr).squeeze(0).cpu().numpy()
f0_coarse, f0 = get_f0(source.cpu().numpy()[0], soft.shape[0] * 2, tran)
return soft, f0, source_bak
def infer(self, speaker_id, tran, raw_path, dev=False):
if type(speaker_id) == str:
speaker_id = self.spk2id[speaker_id]
sid = torch.LongTensor([int(speaker_id)]).to(self.dev).unsqueeze(0)
soft, pitch, source = self.get_unit_pitch(raw_path, tran)
f0 = torch.FloatTensor(clean_pitch(pitch)).unsqueeze(0).to(self.dev)
if "half" in self.net_g_path and torch.cuda.is_available():
stn_tst = torch.HalfTensor(soft)
else:
stn_tst = torch.FloatTensor(soft)
# 提取幅度谱
# spec = get_spec(source).to(self.dev)
with torch.no_grad():
x_tst = stn_tst.unsqueeze(0).to(self.dev)
start = time.time()
x_tst = torch.repeat_interleave(x_tst, repeats=2, dim=1).transpose(1, 2)
audio = self.net_g_ms.infer(x_tst, f0=f0, g=sid)[0, 0].data.float()
# audio = self.net_g_ms.infer_v1(x_tst, spec[:, :, :f0.size(-1)], f0=f0, g=sid)[0, 0].data.float()
use_time = time.time() - start
print("vits use time:{}".format(use_time))
return audio, audio.shape[-1]
# class SvcONNXInferModel(object):
# def __init__(self, hubert_onnx, vits_onnx, config_path):
# self.config_path = config_path
# self.vits_onnx = vits_onnx
# self.hubert_onnx = hubert_onnx
# self.hubert_onnx_session = onnxruntime.InferenceSession(hubert_onnx, providers=['CUDAExecutionProvider', ])
# self.inspect_onnx(self.hubert_onnx_session)
# self.vits_onnx_session = onnxruntime.InferenceSession(vits_onnx, providers=['CUDAExecutionProvider', ])
# self.inspect_onnx(self.vits_onnx_session)
# self.hps_ms = utils.get_hparams_from_file(self.config_path)
# self.target_sample = self.hps_ms.data.sampling_rate
# self.feature_input = FeatureInput(self.hps_ms.data.sampling_rate, self.hps_ms.data.hop_length)
#
# @staticmethod
# def inspect_onnx(session):
# for i in session.get_inputs():
# print("name:{}\tshape:{}\tdtype:{}".format(i.name, i.shape, i.type))
# for i in session.get_outputs():
# print("name:{}\tshape:{}\tdtype:{}".format(i.name, i.shape, i.type))
#
# def infer(self, speaker_id, tran, raw_path):
# sid = np.array([int(speaker_id)], dtype=np.int64)
# soft, pitch = self.get_unit_pitch(raw_path, tran)
# pitch = np.expand_dims(pitch, axis=0).astype(np.int64)
# stn_tst = soft
# x_tst = np.expand_dims(stn_tst, axis=0)
# x_tst_lengths = np.array([stn_tst.shape[0]], dtype=np.int64)
# # 使用ONNX Runtime进行推理
# start = time.time()
# audio = self.vits_onnx_session.run(output_names=["audio"],
# input_feed={
# "hidden_unit": x_tst,
# "lengths": x_tst_lengths,
# "pitch": pitch,
# "sid": sid,
# })[0][0, 0]
# use_time = time.time() - start
# print("vits_onnx_session.run time:{}".format(use_time))
# audio = torch.from_numpy(audio)
# return audio, audio.shape[-1]
#
# def get_units(self, source, sr):
# source = torchaudio.functional.resample(source, sr, 16000)
# if len(source.shape) == 2 and source.shape[1] >= 2:
# source = torch.mean(source, dim=0).unsqueeze(0)
# source = source.unsqueeze(0)
# # 使用ONNX Runtime进行推理
# start = time.time()
# units = self.hubert_onnx_session.run(output_names=["embed"],
# input_feed={"source": source.numpy()})[0]
# use_time = time.time() - start
# print("hubert_onnx_session.run time:{}".format(use_time))
# return units
#
# def transcribe(self, source, sr, length, transform):
# feature_pit = self.feature_input.compute_f0(source, sr)
# feature_pit = feature_pit * 2 ** (transform / 12)
# feature_pit = resize2d_f0(feature_pit, length)
# coarse_pit = self.feature_input.coarse_f0(feature_pit)
# return coarse_pit
#
# def get_unit_pitch(self, in_path, tran):
# source, sr = torchaudio.load(in_path)
# soft = self.get_units(source, sr).squeeze(0)
# input_pitch = self.transcribe(source.numpy()[0], sr, soft.shape[0], tran)
# return soft, input_pitch
class RealTimeVC:
def __init__(self):
self.last_chunk = None
self.last_o = None
self.chunk_len = 16000 # 区块长度
self.pre_len = 3840 # 交叉淡化长度,640的倍数
"""输入输出都是1维numpy 音频波形数组"""
def process(self, svc_model, speaker_id, f_pitch_change, input_wav_path):
audio, sr = torchaudio.load(input_wav_path)
audio = audio.cpu().numpy()[0]
temp_wav = io.BytesIO()
if self.last_chunk is None:
input_wav_path.seek(0)
audio, sr = svc_model.infer(speaker_id, f_pitch_change, input_wav_path)
audio = audio.cpu().numpy()
self.last_chunk = audio[-self.pre_len:]
self.last_o = audio
return audio[-self.chunk_len:]
else:
audio = np.concatenate([self.last_chunk, audio])
soundfile.write(temp_wav, audio, sr, format="wav")
temp_wav.seek(0)
audio, sr = svc_model.infer(speaker_id, f_pitch_change, temp_wav)
audio = audio.cpu().numpy()
ret = maad.util.crossfade(self.last_o, audio, self.pre_len)
self.last_chunk = audio[-self.pre_len:]
self.last_o = audio
return ret[self.chunk_len:2 * self.chunk_len]
diff --git a/AutoCoverTool/svc_inference/webui.py b/AutoCoverTool/svc_inference/webui.py
index 0cb72e5..42d40fc 100644
--- a/AutoCoverTool/svc_inference/webui.py
+++ b/AutoCoverTool/svc_inference/webui.py
@@ -1,88 +1,101 @@
"""
构建唱歌音色转换网页(基于3.0)
要求:
1. 音频上传
2. 推理
3. 下载
"""
import os
import time
import glob
import shutil
import librosa
import soundfile
import gradio as gr
from online.common import update_db
from ref.so_vits_svc.inference_main import *
gs_tmp_dir = "/tmp/svc_inference"
gs_model_dir = "/data/prod/so_vits_models/3.0"
gs_test_wav_dir = "/data/prod/so_vits_models/test_svc_file/3.0"
gs_config_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "config.json")
def generate_svc_file():
"""
:return:
"""
if not os.path.exists(gs_test_wav_dir):
os.makedirs(gs_test_wav_dir)
test_wav_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "../res/syz_test.wav")
model_path_list = glob.glob(os.path.join(gs_model_dir, "*/*pth"))
st = time.time()
for idx, model_path in enumerate(model_path_list):
model_name = model_path.strip().split("/")[-1].split(".pth", "")
dst_path = os.path.join(gs_test_wav_dir, "{}.wav".format(model_name))
if not os.path.exists(dst_path):
inf(model_path, gs_config_path, test_wav_path, dst_path, "prod")
print("now_per={}/{}".format(idx, len(model_path_list), time.time() - st))
def update_state(gender, user_id):
sql = "update av_db.av_svc_model set gender={} where user_id=\"{}\"".format(gender, user_id)
update_db(sql)
# 按钮控制
-def click():
+def click_male(user_id):
+ pass
+
+
+def click_female(user_id):
+ pass
+
+
+def click_delete(user_id):
pass
def main():
# header
st = time.time()
generate_svc_file()
print("generate svc sp={}".format(time.time() - st))
app = gr.Blocks()
with app:
# 头部介绍
gr.Markdown(value="""
### 人声质量评价
作者:starmaker音视频
""")
# 列表展示
# 1. 每一行有音频,性别,删除等按钮
svc_files = glob.glob(os.path.join(gs_test_wav_dir, "*wav"))
for svc_file in svc_files:
+ user_id = svc_file.split("/")[-1].replace(".wav", "")
gr.Audio(source=svc_file)
+
male_gender_btn = gr.Button("male")
female_gender_btn = gr.Button("female")
del_btn = gr.Button("female")
+ male_gender_btn.click(click_male, inputs=[user_id])
+ female_gender_btn.click(click_female, inputs=[user_id])
+ del_btn.click(click_delete, inputs=[user_id])
refresh_btn = gr.Button("refresh_model_list")
refresh_btn.click(fn=model_select, inputs=[], outputs=gs_model_list_dropdown)
# 音频输入框
input_audio = gr.inputs.Audio(label="input")
vc_transform = gr.Number(label="变调(整数,可以正负,半音数量,升高八度就是12)", value=0)
gen_btn = gr.Button("generate", variant="primary")
output_audio = gr.outputs.Audio(label="output", type='filepath')
gen_btn.click(fn=svc, inputs=[input_audio, gs_model_list_dropdown, vc_transform], outputs=output_audio)
# 本方法实现同一时刻只有一个程序在服务器端运行
app.queue(concurrency_count=1, max_size=2044).launch(server_name="0.0.0.0", inbrowser=True, quiet=True,
server_port=7860)
if __name__ == '__main__':
main()

File Metadata

Mime Type
text/x-diff
Expires
Sun, Jan 12, 08:32 (1 d, 11 h)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
1347179
Default Alt Text
(17 KB)

Event Timeline