Page Menu
Home
Phabricator
Search
Configure Global Search
Log In
Files
F4880306
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
17 KB
Subscribers
None
View Options
diff --git a/AutoCoverTool/ref/so_vits_svc/inference/infer_tool.py b/AutoCoverTool/ref/so_vits_svc/inference/infer_tool.py
index 9539a97..628cbdf 100644
--- a/AutoCoverTool/ref/so_vits_svc/inference/infer_tool.py
+++ b/AutoCoverTool/ref/so_vits_svc/inference/infer_tool.py
@@ -1,342 +1,342 @@
import hashlib
import json
import logging
import os
import time
from pathlib import Path
import librosa
import maad
import numpy as np
# import onnxruntime
import parselmouth
import soundfile
import torch
import torchaudio
from hubert import hubert_model
import utils
from models import SynthesizerTrn
import copy
logging.getLogger('matplotlib').setLevel(logging.WARNING)
from mel_processing import spectrogram_torch, spec_to_mel_torch
def get_spec(audio):
audio_norm = audio
print(audio_norm)
spec = spectrogram_torch(audio_norm, 1280, 32000, 320, 1280, center=False)
return spec
def read_temp(file_name):
if not os.path.exists(file_name):
with open(file_name, "w") as f:
f.write(json.dumps({"info": "temp_dict"}))
return {}
else:
try:
with open(file_name, "r") as f:
data = f.read()
data_dict = json.loads(data)
if os.path.getsize(file_name) > 50 * 1024 * 1024:
f_name = file_name.replace("\\", "/").split("/")[-1]
print(f"clean {f_name}")
for wav_hash in list(data_dict.keys()):
if int(time.time()) - int(data_dict[wav_hash]["time"]) > 14 * 24 * 3600:
del data_dict[wav_hash]
except Exception as e:
print(e)
print(f"{file_name} error,auto rebuild file")
data_dict = {"info": "temp_dict"}
return data_dict
def write_temp(file_name, data):
with open(file_name, "w") as f:
f.write(json.dumps(data))
def timeit(func):
def run(*args, **kwargs):
t = time.time()
res = func(*args, **kwargs)
print('executing \'%s\' costed %.3fs' % (func.__name__, time.time() - t))
return res
return run
def format_wav(audio_path):
if Path(audio_path).suffix == '.wav':
return
raw_audio, raw_sample_rate = librosa.load(audio_path, mono=True, sr=None)
soundfile.write(Path(audio_path).with_suffix(".wav"), raw_audio, raw_sample_rate)
def get_end_file(dir_path, end):
file_lists = []
for root, dirs, files in os.walk(dir_path):
files = [f for f in files if f[0] != '.']
dirs[:] = [d for d in dirs if d[0] != '.']
for f_file in files:
if f_file.endswith(end):
file_lists.append(os.path.join(root, f_file).replace("\\", "/"))
return file_lists
def get_md5(content):
return hashlib.new("md5", content).hexdigest()
def resize2d_f0(x, target_len):
source = np.array(x)
source[source < 0.001] = np.nan
target = np.interp(np.arange(0, len(source) * target_len, len(source)) / target_len, np.arange(0, len(source)),
source)
res = np.nan_to_num(target)
return res
def get_f0(x, p_len, f0_up_key=0):
time_step = 160 / 16000 * 1000
f0_min = 50
f0_max = 1100
f0_mel_min = 1127 * np.log(1 + f0_min / 700)
f0_mel_max = 1127 * np.log(1 + f0_max / 700)
f0 = parselmouth.Sound(x, 16000).to_pitch_ac(
time_step=time_step / 1000, voicing_threshold=0.6,
pitch_floor=f0_min, pitch_ceiling=f0_max).selected_array['frequency']
if len(f0) > p_len:
f0 = f0[:p_len]
pad_size = (p_len - len(f0) + 1) // 2
if (pad_size > 0 or p_len - len(f0) - pad_size > 0):
f0 = np.pad(f0, [[pad_size, p_len - len(f0) - pad_size]], mode='constant')
f0 *= pow(2, f0_up_key / 12)
f0_mel = 1127 * np.log(1 + f0 / 700)
f0_mel[f0_mel > 0] = (f0_mel[f0_mel > 0] - f0_mel_min) * 254 / (f0_mel_max - f0_mel_min) + 1
f0_mel[f0_mel <= 1] = 1
f0_mel[f0_mel > 255] = 255
f0_coarse = np.rint(f0_mel).astype(np.int)
return f0_coarse, f0
def clean_pitch(input_pitch):
num_nan = np.sum(input_pitch == 1)
if num_nan / len(input_pitch) > 0.9:
input_pitch[input_pitch != 1] = 1
return input_pitch
def plt_pitch(input_pitch):
input_pitch = input_pitch.astype(float)
input_pitch[input_pitch == 1] = np.nan
return input_pitch
def f0_to_pitch(ff):
f0_pitch = 69 + 12 * np.log2(ff / 440)
return f0_pitch
def fill_a_to_b(a, b):
if len(a) < len(b):
for _ in range(0, len(b) - len(a)):
a.append(a[0])
def mkdir(paths: list):
for path in paths:
if not os.path.exists(path):
os.mkdir(path)
class Svc(object):
- def __init__(self, net_g_path, config_path, hubert_path="data/models/hubert-soft-0d54a1f4.pt",
+ def __init__(self, net_g_path, config_path, hubert_path="/data/prod/so_vits_models/models/hubert-soft-0d54a1f4.pt",
onnx=False):
self.onnx = onnx
self.net_g_path = net_g_path
self.hubert_path = hubert_path
self.dev = torch.device("cuda" if torch.cuda.is_available() else "cpu")
self.net_g_ms = None
self.hps_ms = utils.get_hparams_from_file(config_path)
self.target_sample = self.hps_ms.data.sampling_rate
self.hop_size = self.hps_ms.data.hop_length
self.speakers = {}
for spk, sid in self.hps_ms.spk.items():
self.speakers[sid] = spk
self.spk2id = self.hps_ms.spk
# 加载hubert
self.hubert_soft = hubert_model.hubert_soft(hubert_path)
if torch.cuda.is_available():
self.hubert_soft = self.hubert_soft.cuda()
self.load_model()
def load_model(self):
# 获取模型配置
if self.onnx:
raise NotImplementedError
# self.net_g_ms = SynthesizerTrnForONNX(
# 178,
# self.hps_ms.data.filter_length // 2 + 1,
# self.hps_ms.train.segment_size // self.hps_ms.data.hop_length,
# n_speakers=self.hps_ms.data.n_speakers,
# **self.hps_ms.model)
# _ = utils.load_checkpoint(self.net_g_path, self.net_g_ms, None)
else:
self.net_g_ms = SynthesizerTrn(
self.hps_ms.data.filter_length // 2 + 1,
self.hps_ms.train.segment_size // self.hps_ms.data.hop_length,
**self.hps_ms.model)
_ = utils.load_checkpoint(self.net_g_path, self.net_g_ms, None)
if "half" in self.net_g_path and torch.cuda.is_available():
_ = self.net_g_ms.half().eval().to(self.dev)
else:
_ = self.net_g_ms.eval().to(self.dev)
def get_units(self, source, sr):
source = source.unsqueeze(0).to(self.dev)
with torch.inference_mode():
start = time.time()
units = self.hubert_soft.units(source)
use_time = time.time() - start
print("hubert use time:{}".format(use_time))
return units
def get_unit_pitch(self, in_path, tran):
source, sr = torchaudio.load(in_path)
source_bak = copy.deepcopy(source)
source = torchaudio.functional.resample(source, sr, 16000)
if len(source.shape) == 2 and source.shape[1] >= 2:
source = torch.mean(source, dim=0).unsqueeze(0)
soft = self.get_units(source, sr).squeeze(0).cpu().numpy()
f0_coarse, f0 = get_f0(source.cpu().numpy()[0], soft.shape[0] * 2, tran)
return soft, f0, source_bak
def infer(self, speaker_id, tran, raw_path, dev=False):
if type(speaker_id) == str:
speaker_id = self.spk2id[speaker_id]
sid = torch.LongTensor([int(speaker_id)]).to(self.dev).unsqueeze(0)
soft, pitch, source = self.get_unit_pitch(raw_path, tran)
f0 = torch.FloatTensor(clean_pitch(pitch)).unsqueeze(0).to(self.dev)
if "half" in self.net_g_path and torch.cuda.is_available():
stn_tst = torch.HalfTensor(soft)
else:
stn_tst = torch.FloatTensor(soft)
# 提取幅度谱
# spec = get_spec(source).to(self.dev)
with torch.no_grad():
x_tst = stn_tst.unsqueeze(0).to(self.dev)
start = time.time()
x_tst = torch.repeat_interleave(x_tst, repeats=2, dim=1).transpose(1, 2)
audio = self.net_g_ms.infer(x_tst, f0=f0, g=sid)[0, 0].data.float()
# audio = self.net_g_ms.infer_v1(x_tst, spec[:, :, :f0.size(-1)], f0=f0, g=sid)[0, 0].data.float()
use_time = time.time() - start
print("vits use time:{}".format(use_time))
return audio, audio.shape[-1]
# class SvcONNXInferModel(object):
# def __init__(self, hubert_onnx, vits_onnx, config_path):
# self.config_path = config_path
# self.vits_onnx = vits_onnx
# self.hubert_onnx = hubert_onnx
# self.hubert_onnx_session = onnxruntime.InferenceSession(hubert_onnx, providers=['CUDAExecutionProvider', ])
# self.inspect_onnx(self.hubert_onnx_session)
# self.vits_onnx_session = onnxruntime.InferenceSession(vits_onnx, providers=['CUDAExecutionProvider', ])
# self.inspect_onnx(self.vits_onnx_session)
# self.hps_ms = utils.get_hparams_from_file(self.config_path)
# self.target_sample = self.hps_ms.data.sampling_rate
# self.feature_input = FeatureInput(self.hps_ms.data.sampling_rate, self.hps_ms.data.hop_length)
#
# @staticmethod
# def inspect_onnx(session):
# for i in session.get_inputs():
# print("name:{}\tshape:{}\tdtype:{}".format(i.name, i.shape, i.type))
# for i in session.get_outputs():
# print("name:{}\tshape:{}\tdtype:{}".format(i.name, i.shape, i.type))
#
# def infer(self, speaker_id, tran, raw_path):
# sid = np.array([int(speaker_id)], dtype=np.int64)
# soft, pitch = self.get_unit_pitch(raw_path, tran)
# pitch = np.expand_dims(pitch, axis=0).astype(np.int64)
# stn_tst = soft
# x_tst = np.expand_dims(stn_tst, axis=0)
# x_tst_lengths = np.array([stn_tst.shape[0]], dtype=np.int64)
# # 使用ONNX Runtime进行推理
# start = time.time()
# audio = self.vits_onnx_session.run(output_names=["audio"],
# input_feed={
# "hidden_unit": x_tst,
# "lengths": x_tst_lengths,
# "pitch": pitch,
# "sid": sid,
# })[0][0, 0]
# use_time = time.time() - start
# print("vits_onnx_session.run time:{}".format(use_time))
# audio = torch.from_numpy(audio)
# return audio, audio.shape[-1]
#
# def get_units(self, source, sr):
# source = torchaudio.functional.resample(source, sr, 16000)
# if len(source.shape) == 2 and source.shape[1] >= 2:
# source = torch.mean(source, dim=0).unsqueeze(0)
# source = source.unsqueeze(0)
# # 使用ONNX Runtime进行推理
# start = time.time()
# units = self.hubert_onnx_session.run(output_names=["embed"],
# input_feed={"source": source.numpy()})[0]
# use_time = time.time() - start
# print("hubert_onnx_session.run time:{}".format(use_time))
# return units
#
# def transcribe(self, source, sr, length, transform):
# feature_pit = self.feature_input.compute_f0(source, sr)
# feature_pit = feature_pit * 2 ** (transform / 12)
# feature_pit = resize2d_f0(feature_pit, length)
# coarse_pit = self.feature_input.coarse_f0(feature_pit)
# return coarse_pit
#
# def get_unit_pitch(self, in_path, tran):
# source, sr = torchaudio.load(in_path)
# soft = self.get_units(source, sr).squeeze(0)
# input_pitch = self.transcribe(source.numpy()[0], sr, soft.shape[0], tran)
# return soft, input_pitch
class RealTimeVC:
def __init__(self):
self.last_chunk = None
self.last_o = None
self.chunk_len = 16000 # 区块长度
self.pre_len = 3840 # 交叉淡化长度,640的倍数
"""输入输出都是1维numpy 音频波形数组"""
def process(self, svc_model, speaker_id, f_pitch_change, input_wav_path):
audio, sr = torchaudio.load(input_wav_path)
audio = audio.cpu().numpy()[0]
temp_wav = io.BytesIO()
if self.last_chunk is None:
input_wav_path.seek(0)
audio, sr = svc_model.infer(speaker_id, f_pitch_change, input_wav_path)
audio = audio.cpu().numpy()
self.last_chunk = audio[-self.pre_len:]
self.last_o = audio
return audio[-self.chunk_len:]
else:
audio = np.concatenate([self.last_chunk, audio])
soundfile.write(temp_wav, audio, sr, format="wav")
temp_wav.seek(0)
audio, sr = svc_model.infer(speaker_id, f_pitch_change, temp_wav)
audio = audio.cpu().numpy()
ret = maad.util.crossfade(self.last_o, audio, self.pre_len)
self.last_chunk = audio[-self.pre_len:]
self.last_o = audio
return ret[self.chunk_len:2 * self.chunk_len]
diff --git a/AutoCoverTool/svc_inference/webui.py b/AutoCoverTool/svc_inference/webui.py
index 0cb72e5..42d40fc 100644
--- a/AutoCoverTool/svc_inference/webui.py
+++ b/AutoCoverTool/svc_inference/webui.py
@@ -1,88 +1,101 @@
"""
构建唱歌音色转换网页(基于3.0)
要求:
1. 音频上传
2. 推理
3. 下载
"""
import os
import time
import glob
import shutil
import librosa
import soundfile
import gradio as gr
from online.common import update_db
from ref.so_vits_svc.inference_main import *
gs_tmp_dir = "/tmp/svc_inference"
gs_model_dir = "/data/prod/so_vits_models/3.0"
gs_test_wav_dir = "/data/prod/so_vits_models/test_svc_file/3.0"
gs_config_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "config.json")
def generate_svc_file():
"""
:return:
"""
if not os.path.exists(gs_test_wav_dir):
os.makedirs(gs_test_wav_dir)
test_wav_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "../res/syz_test.wav")
model_path_list = glob.glob(os.path.join(gs_model_dir, "*/*pth"))
st = time.time()
for idx, model_path in enumerate(model_path_list):
model_name = model_path.strip().split("/")[-1].split(".pth", "")
dst_path = os.path.join(gs_test_wav_dir, "{}.wav".format(model_name))
if not os.path.exists(dst_path):
inf(model_path, gs_config_path, test_wav_path, dst_path, "prod")
print("now_per={}/{}".format(idx, len(model_path_list), time.time() - st))
def update_state(gender, user_id):
sql = "update av_db.av_svc_model set gender={} where user_id=\"{}\"".format(gender, user_id)
update_db(sql)
# 按钮控制
-def click():
+def click_male(user_id):
+ pass
+
+
+def click_female(user_id):
+ pass
+
+
+def click_delete(user_id):
pass
def main():
# header
st = time.time()
generate_svc_file()
print("generate svc sp={}".format(time.time() - st))
app = gr.Blocks()
with app:
# 头部介绍
gr.Markdown(value="""
### 人声质量评价
作者:starmaker音视频
""")
# 列表展示
# 1. 每一行有音频,性别,删除等按钮
svc_files = glob.glob(os.path.join(gs_test_wav_dir, "*wav"))
for svc_file in svc_files:
+ user_id = svc_file.split("/")[-1].replace(".wav", "")
gr.Audio(source=svc_file)
+
male_gender_btn = gr.Button("male")
female_gender_btn = gr.Button("female")
del_btn = gr.Button("female")
+ male_gender_btn.click(click_male, inputs=[user_id])
+ female_gender_btn.click(click_female, inputs=[user_id])
+ del_btn.click(click_delete, inputs=[user_id])
refresh_btn = gr.Button("refresh_model_list")
refresh_btn.click(fn=model_select, inputs=[], outputs=gs_model_list_dropdown)
# 音频输入框
input_audio = gr.inputs.Audio(label="input")
vc_transform = gr.Number(label="变调(整数,可以正负,半音数量,升高八度就是12)", value=0)
gen_btn = gr.Button("generate", variant="primary")
output_audio = gr.outputs.Audio(label="output", type='filepath')
gen_btn.click(fn=svc, inputs=[input_audio, gs_model_list_dropdown, vc_transform], outputs=output_audio)
# 本方法实现同一时刻只有一个程序在服务器端运行
app.queue(concurrency_count=1, max_size=2044).launch(server_name="0.0.0.0", inbrowser=True, quiet=True,
server_port=7860)
if __name__ == '__main__':
main()
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Sun, Jan 12, 08:32 (1 d, 15 h)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
1347179
Default Alt Text
(17 KB)
Attached To
R350 av_svc
Event Timeline
Log In to Comment