Page MenuHomePhabricator

No OneTemporary

diff --git a/AutoCoverTool/ref/so_vits_svc/inference/infer_tool.py b/AutoCoverTool/ref/so_vits_svc/inference/infer_tool.py
index 2bfa86d..2f29b6f 100644
--- a/AutoCoverTool/ref/so_vits_svc/inference/infer_tool.py
+++ b/AutoCoverTool/ref/so_vits_svc/inference/infer_tool.py
@@ -1,327 +1,340 @@
import hashlib
import json
import logging
import os
import time
from pathlib import Path
import librosa
import maad
import numpy as np
# import onnxruntime
import parselmouth
import soundfile
import torch
import torchaudio
from hubert import hubert_model
import utils
from models import SynthesizerTrn
-
+import copy
logging.getLogger('matplotlib').setLevel(logging.WARNING)
+from mel_processing import spectrogram_torch, spec_to_mel_torch
+
+
+def get_spec(audio):
+ audio_norm = audio
+ print(audio_norm)
+ spec = spectrogram_torch(audio_norm, 1280, 32000, 320, 1280, center=False)
+ return spec
def read_temp(file_name):
if not os.path.exists(file_name):
with open(file_name, "w") as f:
f.write(json.dumps({"info": "temp_dict"}))
return {}
else:
try:
with open(file_name, "r") as f:
data = f.read()
data_dict = json.loads(data)
if os.path.getsize(file_name) > 50 * 1024 * 1024:
f_name = file_name.replace("\\", "/").split("/")[-1]
print(f"clean {f_name}")
for wav_hash in list(data_dict.keys()):
if int(time.time()) - int(data_dict[wav_hash]["time"]) > 14 * 24 * 3600:
del data_dict[wav_hash]
except Exception as e:
print(e)
print(f"{file_name} error,auto rebuild file")
data_dict = {"info": "temp_dict"}
return data_dict
def write_temp(file_name, data):
with open(file_name, "w") as f:
f.write(json.dumps(data))
def timeit(func):
def run(*args, **kwargs):
t = time.time()
res = func(*args, **kwargs)
print('executing \'%s\' costed %.3fs' % (func.__name__, time.time() - t))
return res
return run
def format_wav(audio_path):
if Path(audio_path).suffix == '.wav':
return
raw_audio, raw_sample_rate = librosa.load(audio_path, mono=True, sr=None)
soundfile.write(Path(audio_path).with_suffix(".wav"), raw_audio, raw_sample_rate)
def get_end_file(dir_path, end):
file_lists = []
for root, dirs, files in os.walk(dir_path):
files = [f for f in files if f[0] != '.']
dirs[:] = [d for d in dirs if d[0] != '.']
for f_file in files:
if f_file.endswith(end):
file_lists.append(os.path.join(root, f_file).replace("\\", "/"))
return file_lists
def get_md5(content):
return hashlib.new("md5", content).hexdigest()
def resize2d_f0(x, target_len):
source = np.array(x)
source[source < 0.001] = np.nan
target = np.interp(np.arange(0, len(source) * target_len, len(source)) / target_len, np.arange(0, len(source)),
source)
res = np.nan_to_num(target)
return res
def get_f0(x, p_len, f0_up_key=0):
time_step = 160 / 16000 * 1000
f0_min = 50
f0_max = 1100
f0_mel_min = 1127 * np.log(1 + f0_min / 700)
f0_mel_max = 1127 * np.log(1 + f0_max / 700)
f0 = parselmouth.Sound(x, 16000).to_pitch_ac(
time_step=time_step / 1000, voicing_threshold=0.6,
pitch_floor=f0_min, pitch_ceiling=f0_max).selected_array['frequency']
if len(f0) > p_len:
f0 = f0[:p_len]
pad_size = (p_len - len(f0) + 1) // 2
if (pad_size > 0 or p_len - len(f0) - pad_size > 0):
f0 = np.pad(f0, [[pad_size, p_len - len(f0) - pad_size]], mode='constant')
f0 *= pow(2, f0_up_key / 12)
f0_mel = 1127 * np.log(1 + f0 / 700)
f0_mel[f0_mel > 0] = (f0_mel[f0_mel > 0] - f0_mel_min) * 254 / (f0_mel_max - f0_mel_min) + 1
f0_mel[f0_mel <= 1] = 1
f0_mel[f0_mel > 255] = 255
f0_coarse = np.rint(f0_mel).astype(np.int)
return f0_coarse, f0
def clean_pitch(input_pitch):
num_nan = np.sum(input_pitch == 1)
if num_nan / len(input_pitch) > 0.9:
input_pitch[input_pitch != 1] = 1
return input_pitch
def plt_pitch(input_pitch):
input_pitch = input_pitch.astype(float)
input_pitch[input_pitch == 1] = np.nan
return input_pitch
def f0_to_pitch(ff):
f0_pitch = 69 + 12 * np.log2(ff / 440)
return f0_pitch
def fill_a_to_b(a, b):
if len(a) < len(b):
for _ in range(0, len(b) - len(a)):
a.append(a[0])
def mkdir(paths: list):
for path in paths:
if not os.path.exists(path):
os.mkdir(path)
class Svc(object):
def __init__(self, net_g_path, config_path, hubert_path="data/models/hubert-soft-0d54a1f4.pt",
onnx=False):
self.onnx = onnx
self.net_g_path = net_g_path
self.hubert_path = hubert_path
self.dev = torch.device("cuda" if torch.cuda.is_available() else "cpu")
self.net_g_ms = None
self.hps_ms = utils.get_hparams_from_file(config_path)
self.target_sample = self.hps_ms.data.sampling_rate
self.hop_size = self.hps_ms.data.hop_length
self.speakers = {}
for spk, sid in self.hps_ms.spk.items():
self.speakers[sid] = spk
self.spk2id = self.hps_ms.spk
# 加载hubert
self.hubert_soft = hubert_model.hubert_soft(hubert_path)
if torch.cuda.is_available():
self.hubert_soft = self.hubert_soft.cuda()
self.load_model()
def load_model(self):
# 获取模型配置
if self.onnx:
raise NotImplementedError
# self.net_g_ms = SynthesizerTrnForONNX(
# 178,
# self.hps_ms.data.filter_length // 2 + 1,
# self.hps_ms.train.segment_size // self.hps_ms.data.hop_length,
# n_speakers=self.hps_ms.data.n_speakers,
# **self.hps_ms.model)
# _ = utils.load_checkpoint(self.net_g_path, self.net_g_ms, None)
else:
self.net_g_ms = SynthesizerTrn(
self.hps_ms.data.filter_length // 2 + 1,
self.hps_ms.train.segment_size // self.hps_ms.data.hop_length,
**self.hps_ms.model)
_ = utils.load_checkpoint(self.net_g_path, self.net_g_ms, None)
if "half" in self.net_g_path and torch.cuda.is_available():
_ = self.net_g_ms.half().eval().to(self.dev)
else:
_ = self.net_g_ms.eval().to(self.dev)
def get_units(self, source, sr):
source = source.unsqueeze(0).to(self.dev)
with torch.inference_mode():
start = time.time()
units = self.hubert_soft.units(source)
use_time = time.time() - start
print("hubert use time:{}".format(use_time))
return units
def get_unit_pitch(self, in_path, tran):
source, sr = torchaudio.load(in_path)
+ source_bak = copy.deepcopy(source)
source = torchaudio.functional.resample(source, sr, 16000)
if len(source.shape) == 2 and source.shape[1] >= 2:
source = torch.mean(source, dim=0).unsqueeze(0)
soft = self.get_units(source, sr).squeeze(0).cpu().numpy()
f0_coarse, f0 = get_f0(source.cpu().numpy()[0], soft.shape[0] * 2, tran)
- return soft, f0
+ return soft, f0, source_bak
def infer(self, speaker_id, tran, raw_path, dev=False):
if type(speaker_id) == str:
speaker_id = self.spk2id[speaker_id]
sid = torch.LongTensor([int(speaker_id)]).to(self.dev).unsqueeze(0)
- soft, pitch = self.get_unit_pitch(raw_path, tran)
+ soft, pitch, source = self.get_unit_pitch(raw_path, tran)
f0 = torch.FloatTensor(clean_pitch(pitch)).unsqueeze(0).to(self.dev)
if "half" in self.net_g_path and torch.cuda.is_available():
stn_tst = torch.HalfTensor(soft)
else:
stn_tst = torch.FloatTensor(soft)
+
+ # 提取幅度谱
+ # spec = get_spec(source).to(self.dev)
with torch.no_grad():
x_tst = stn_tst.unsqueeze(0).to(self.dev)
start = time.time()
x_tst = torch.repeat_interleave(x_tst, repeats=2, dim=1).transpose(1, 2)
audio = self.net_g_ms.infer(x_tst, f0=f0, g=sid)[0, 0].data.float()
+ # audio = self.net_g_ms.infer_v1(x_tst, spec[:, :, :f0.size(-1)], f0=f0, g=sid)[0, 0].data.float()
use_time = time.time() - start
print("vits use time:{}".format(use_time))
return audio, audio.shape[-1]
# class SvcONNXInferModel(object):
# def __init__(self, hubert_onnx, vits_onnx, config_path):
# self.config_path = config_path
# self.vits_onnx = vits_onnx
# self.hubert_onnx = hubert_onnx
# self.hubert_onnx_session = onnxruntime.InferenceSession(hubert_onnx, providers=['CUDAExecutionProvider', ])
# self.inspect_onnx(self.hubert_onnx_session)
# self.vits_onnx_session = onnxruntime.InferenceSession(vits_onnx, providers=['CUDAExecutionProvider', ])
# self.inspect_onnx(self.vits_onnx_session)
# self.hps_ms = utils.get_hparams_from_file(self.config_path)
# self.target_sample = self.hps_ms.data.sampling_rate
# self.feature_input = FeatureInput(self.hps_ms.data.sampling_rate, self.hps_ms.data.hop_length)
#
# @staticmethod
# def inspect_onnx(session):
# for i in session.get_inputs():
# print("name:{}\tshape:{}\tdtype:{}".format(i.name, i.shape, i.type))
# for i in session.get_outputs():
# print("name:{}\tshape:{}\tdtype:{}".format(i.name, i.shape, i.type))
#
# def infer(self, speaker_id, tran, raw_path):
# sid = np.array([int(speaker_id)], dtype=np.int64)
# soft, pitch = self.get_unit_pitch(raw_path, tran)
# pitch = np.expand_dims(pitch, axis=0).astype(np.int64)
# stn_tst = soft
# x_tst = np.expand_dims(stn_tst, axis=0)
# x_tst_lengths = np.array([stn_tst.shape[0]], dtype=np.int64)
# # 使用ONNX Runtime进行推理
# start = time.time()
# audio = self.vits_onnx_session.run(output_names=["audio"],
# input_feed={
# "hidden_unit": x_tst,
# "lengths": x_tst_lengths,
# "pitch": pitch,
# "sid": sid,
# })[0][0, 0]
# use_time = time.time() - start
# print("vits_onnx_session.run time:{}".format(use_time))
# audio = torch.from_numpy(audio)
# return audio, audio.shape[-1]
#
# def get_units(self, source, sr):
# source = torchaudio.functional.resample(source, sr, 16000)
# if len(source.shape) == 2 and source.shape[1] >= 2:
# source = torch.mean(source, dim=0).unsqueeze(0)
# source = source.unsqueeze(0)
# # 使用ONNX Runtime进行推理
# start = time.time()
# units = self.hubert_onnx_session.run(output_names=["embed"],
# input_feed={"source": source.numpy()})[0]
# use_time = time.time() - start
# print("hubert_onnx_session.run time:{}".format(use_time))
# return units
#
# def transcribe(self, source, sr, length, transform):
# feature_pit = self.feature_input.compute_f0(source, sr)
# feature_pit = feature_pit * 2 ** (transform / 12)
# feature_pit = resize2d_f0(feature_pit, length)
# coarse_pit = self.feature_input.coarse_f0(feature_pit)
# return coarse_pit
#
# def get_unit_pitch(self, in_path, tran):
# source, sr = torchaudio.load(in_path)
# soft = self.get_units(source, sr).squeeze(0)
# input_pitch = self.transcribe(source.numpy()[0], sr, soft.shape[0], tran)
# return soft, input_pitch
class RealTimeVC:
def __init__(self):
self.last_chunk = None
self.last_o = None
self.chunk_len = 16000 # 区块长度
self.pre_len = 3840 # 交叉淡化长度,640的倍数
"""输入输出都是1维numpy 音频波形数组"""
def process(self, svc_model, speaker_id, f_pitch_change, input_wav_path):
audio, sr = torchaudio.load(input_wav_path)
audio = audio.cpu().numpy()[0]
temp_wav = io.BytesIO()
if self.last_chunk is None:
input_wav_path.seek(0)
audio, sr = svc_model.infer(speaker_id, f_pitch_change, input_wav_path)
audio = audio.cpu().numpy()
self.last_chunk = audio[-self.pre_len:]
self.last_o = audio
return audio[-self.chunk_len:]
else:
audio = np.concatenate([self.last_chunk, audio])
soundfile.write(temp_wav, audio, sr, format="wav")
temp_wav.seek(0)
audio, sr = svc_model.infer(speaker_id, f_pitch_change, temp_wav)
audio = audio.cpu().numpy()
ret = maad.util.crossfade(self.last_o, audio, self.pre_len)
self.last_chunk = audio[-self.pre_len:]
self.last_o = audio
return ret[self.chunk_len:2 * self.chunk_len]
diff --git a/AutoCoverTool/ref/so_vits_svc/inference_main.py b/AutoCoverTool/ref/so_vits_svc/inference_main.py
index e1579ec..326ad07 100644
--- a/AutoCoverTool/ref/so_vits_svc/inference_main.py
+++ b/AutoCoverTool/ref/so_vits_svc/inference_main.py
@@ -1,83 +1,85 @@
import io
import os
import sys
import logging
import time
from pathlib import Path
+from copy import deepcopy
+import torch
import librosa
import numpy as np
import soundfile
from inference import infer_tool
from inference import slicer
from inference.infer_tool import Svc
logging.getLogger('numba').setLevel(logging.WARNING)
chunks_dict = infer_tool.read_temp("ref/so_vits_svc/inference/chunks_temp.json")
def inf(model_path, config_path, raw_audio_path, dst_path, dev):
# model_path = "logs/32k/G_174000-Copy1.pth"
# config_path = "configs/config.json"
svc_model = Svc(model_path, config_path)
out_dir = os.path.dirname(dst_path)
print(dst_path)
os.makedirs(out_dir, exist_ok=True)
# 支持多个wav文件,放在raw文件夹下
tran = 0
spk_list = ['speaker0'] # 每次同时合成多语者音色
slice_db = -40 # 默认-40,嘈杂的音频可以-30,干声保留呼吸可以-50
wav_format = 'wav' # 音频输出格式
# infer_tool.fill_a_to_b(trans, clean_names)
# for clean_name, tran in zip(clean_names, trans):
# raw_audio_path = f"raw/{clean_name}"
# if "." not in raw_audio_path:
# raw_audio_path += ".wav"
infer_tool.format_wav(raw_audio_path)
wav_path = Path(raw_audio_path).with_suffix('.wav')
chunks = slicer.cut(wav_path, db_thresh=slice_db)
audio_data, audio_sr = slicer.chunks2audio(wav_path, chunks)
for spk in spk_list:
audio = []
for (slice_tag, data) in audio_data:
print(f'#=====segment start, {round(len(data) / audio_sr, 3)}s======')
length = int(np.ceil(len(data) / audio_sr * svc_model.target_sample))
raw_path = io.BytesIO()
soundfile.write(raw_path, data, audio_sr, format="wav")
raw_path.seek(0)
if slice_tag:
print('jump empty segment')
_audio = np.zeros(length)
else:
out_audio, out_sr = svc_model.infer(spk, tran, raw_path, dev == "test")
_audio = out_audio.cpu().numpy()
audio.extend(list(_audio))
soundfile.write(dst_path, audio, svc_model.target_sample, format=wav_format)
if __name__ == '__main__':
g_model = sys.argv[1] # 模型地址
g_config = sys.argv[2] # 配置文件地址
g_audio_path = sys.argv[3] # 输入的音频文件地址,wav
g_dst_path = sys.argv[4] # 输出的音频文件地址
if os.path.exists(g_dst_path):
print("{} success ...".format(g_dst_path))
exit(0)
g_dev = "prod"
if len(sys.argv) > 5:
g_dev = sys.argv[5]
g_aa, g_sr = librosa.load(g_audio_path)
d = librosa.get_duration(g_aa, g_sr)
# if g_dev != "test":
# if d > 250:
# print("{} too long".format(g_audio_path))
# exit(0)
st = time.time()
inf(g_model, g_config, g_audio_path, g_dst_path, g_dev)
print("{}, inference sp={}".format(g_audio_path, time.time() - st))
diff --git a/AutoCoverTool/ref/so_vits_svc/losses.py b/AutoCoverTool/ref/so_vits_svc/losses.py
index 41f9be6..c314696 100644
--- a/AutoCoverTool/ref/so_vits_svc/losses.py
+++ b/AutoCoverTool/ref/so_vits_svc/losses.py
@@ -1,61 +1,62 @@
import torch
from torch.nn import functional as F
import commons
def feature_loss(fmap_r, fmap_g):
loss = 0
for dr, dg in zip(fmap_r, fmap_g):
for rl, gl in zip(dr, dg):
rl = rl.float().detach()
gl = gl.float()
loss += torch.mean(torch.abs(rl - gl))
return loss * 2
def discriminator_loss(disc_real_outputs, disc_generated_outputs):
loss = 0
r_losses = []
g_losses = []
for dr, dg in zip(disc_real_outputs, disc_generated_outputs):
dr = dr.float()
dg = dg.float()
r_loss = torch.mean((1-dr)**2)
g_loss = torch.mean(dg**2)
loss += (r_loss + g_loss)
r_losses.append(r_loss.item())
g_losses.append(g_loss.item())
return loss, r_losses, g_losses
def generator_loss(disc_outputs):
loss = 0
gen_losses = []
for dg in disc_outputs:
dg = dg.float()
l = torch.mean((1-dg)**2)
gen_losses.append(l)
loss += l
return loss, gen_losses
def kl_loss(z_p, logs_q, m_p, logs_p, z_mask):
"""
z_p, logs_q: [b, h, t_t]
m_p, logs_p: [b, h, t_t]
"""
z_p = z_p.float()
logs_q = logs_q.float()
m_p = m_p.float()
logs_p = logs_p.float()
z_mask = z_mask.float()
+ # kl = -0.5 * torch.sum(1 + log_var - mean ** 2 - torch.exp(log_var))
#print(logs_p)
kl = logs_p - logs_q - 0.5
kl += 0.5 * ((z_p - m_p)**2) * torch.exp(-2. * logs_p)
kl = torch.sum(kl * z_mask)
l = kl / torch.sum(z_mask)
return l
diff --git a/AutoCoverTool/ref/so_vits_svc/models.py b/AutoCoverTool/ref/so_vits_svc/models.py
index 3e3498b..6fdea53 100644
--- a/AutoCoverTool/ref/so_vits_svc/models.py
+++ b/AutoCoverTool/ref/so_vits_svc/models.py
@@ -1,357 +1,388 @@
import copy
import math
import torch
from torch import nn
from torch.nn import functional as F
import attentions
import commons
import modules
from torch.nn import Conv1d, ConvTranspose1d, AvgPool1d, Conv2d
from torch.nn.utils import weight_norm, remove_weight_norm, spectral_norm
from commons import init_weights, get_padding
from vdecoder.hifigan.models import Generator
from utils import f0_to_coarse
class ResidualCouplingBlock(nn.Module):
def __init__(self,
channels,
hidden_channels,
kernel_size,
dilation_rate,
n_layers,
n_flows=4,
gin_channels=0):
super().__init__()
self.channels = channels
self.hidden_channels = hidden_channels
self.kernel_size = kernel_size
self.dilation_rate = dilation_rate
self.n_layers = n_layers
self.n_flows = n_flows
self.gin_channels = gin_channels
self.flows = nn.ModuleList()
for i in range(n_flows):
self.flows.append(modules.ResidualCouplingLayer(channels, hidden_channels, kernel_size, dilation_rate, n_layers, gin_channels=gin_channels, mean_only=True))
self.flows.append(modules.Flip())
def forward(self, x, x_mask, g=None, reverse=False):
if not reverse:
for flow in self.flows:
x, _ = flow(x, x_mask, g=g, reverse=reverse)
else:
for flow in reversed(self.flows):
x = flow(x, x_mask, g=g, reverse=reverse)
return x
class Encoder(nn.Module):
def __init__(self,
in_channels,
out_channels,
hidden_channels,
kernel_size,
dilation_rate,
n_layers,
gin_channels=0):
super().__init__()
self.in_channels = in_channels
self.out_channels = out_channels
self.hidden_channels = hidden_channels
self.kernel_size = kernel_size
self.dilation_rate = dilation_rate
self.n_layers = n_layers
self.gin_channels = gin_channels
self.pre = nn.Conv1d(in_channels, hidden_channels, 1)
self.enc = modules.WN(hidden_channels, kernel_size, dilation_rate, n_layers, gin_channels=gin_channels)
self.proj = nn.Conv1d(hidden_channels, out_channels * 2, 1)
def forward(self, x, x_lengths, g=None):
# print(x.shape,x_lengths.shape)
+ # commons.sequence_mask 对于batch层级有价值,x_lengths是每个batch中每一个元素的帧数
+ # 比如输入([3,5,2], 5)那么得到 3 * 5的True/False矩阵,其中第一层矩阵为3个true,2个false,第二层全true,第三层前两个true,其余false
+ # 作用一个批次中允许不同长度的数据一起训练,此时较短的乘以false,剔除影响
x_mask = torch.unsqueeze(commons.sequence_mask(x_lengths, x.size(2)), 1).to(x.dtype)
x = self.pre(x) * x_mask
x = self.enc(x, x_mask, g=g)
stats = self.proj(x) * x_mask
m, logs = torch.split(stats, self.out_channels, dim=1)
z = (m + torch.randn_like(m) * torch.exp(logs)) * x_mask
return z, m, logs, x_mask
class TextEncoder(nn.Module):
def __init__(self,
in_channels,
out_channels,
hidden_channels,
kernel_size,
dilation_rate,
n_layers,
gin_channels=0,
filter_channels=None,
n_heads=None,
p_dropout=None):
super().__init__()
self.in_channels = in_channels
self.out_channels = out_channels
self.hidden_channels = hidden_channels
self.kernel_size = kernel_size
self.dilation_rate = dilation_rate
self.n_layers = n_layers
self.gin_channels = gin_channels
self.pre = nn.Conv1d(in_channels, hidden_channels, 1)
self.proj = nn.Conv1d(hidden_channels, out_channels * 2, 1)
self.f0_emb = nn.Embedding(256, hidden_channels)
self.enc_ = attentions.Encoder(
hidden_channels,
filter_channels,
n_heads,
n_layers,
kernel_size,
p_dropout)
def forward(self, x, x_lengths, f0=None):
# x->(b,256,frame_num), x_lengths -> (b)
# commons.sequence_mask 对于batch层级有价值,x_lengths是每个batch中每一个元素的帧数
# 比如输入([3,5,2], 5)那么得到 3 * 5的True/False矩阵,其中第一层矩阵为3个true,2个false,第二层全true,第三层前两个true,其余false
# 作用一个批次中允许不同长度的数据一起训练,此时较短的乘以false,剔除影响
x_mask = torch.unsqueeze(commons.sequence_mask(x_lengths, x.size(2)), 1).to(x.dtype)
x = self.pre(x) * x_mask
x = x + self.f0_emb(f0).transpose(1,2)
x = self.enc_(x * x_mask, x_mask)
stats = self.proj(x) * x_mask
+ # m是VAE过程中得到的mu,而log对应的是log(sigma)
m, logs = torch.split(stats, self.out_channels, dim=1)
+ # z是随机采样过程
z = (m + torch.randn_like(m) * torch.exp(logs)) * x_mask
return z, m, logs, x_mask
class DiscriminatorP(torch.nn.Module):
def __init__(self, period, kernel_size=5, stride=3, use_spectral_norm=False):
super(DiscriminatorP, self).__init__()
self.period = period
self.use_spectral_norm = use_spectral_norm
norm_f = weight_norm if use_spectral_norm == False else spectral_norm
self.convs = nn.ModuleList([
norm_f(Conv2d(1, 32, (kernel_size, 1), (stride, 1), padding=(get_padding(kernel_size, 1), 0))),
norm_f(Conv2d(32, 128, (kernel_size, 1), (stride, 1), padding=(get_padding(kernel_size, 1), 0))),
norm_f(Conv2d(128, 512, (kernel_size, 1), (stride, 1), padding=(get_padding(kernel_size, 1), 0))),
norm_f(Conv2d(512, 1024, (kernel_size, 1), (stride, 1), padding=(get_padding(kernel_size, 1), 0))),
norm_f(Conv2d(1024, 1024, (kernel_size, 1), 1, padding=(get_padding(kernel_size, 1), 0))),
])
self.conv_post = norm_f(Conv2d(1024, 1, (3, 1), 1, padding=(1, 0)))
def forward(self, x):
fmap = []
# 1d to 2d
b, c, t = x.shape
if t % self.period != 0: # pad first
n_pad = self.period - (t % self.period)
x = F.pad(x, (0, n_pad), "reflect")
t = t + n_pad
x = x.view(b, c, t // self.period, self.period)
for l in self.convs:
x = l(x)
x = F.leaky_relu(x, modules.LRELU_SLOPE)
fmap.append(x)
x = self.conv_post(x)
fmap.append(x)
x = torch.flatten(x, 1, -1)
return x, fmap
class DiscriminatorS(torch.nn.Module):
def __init__(self, use_spectral_norm=False):
super(DiscriminatorS, self).__init__()
norm_f = weight_norm if use_spectral_norm == False else spectral_norm
self.convs = nn.ModuleList([
norm_f(Conv1d(1, 16, 15, 1, padding=7)),
norm_f(Conv1d(16, 64, 41, 4, groups=4, padding=20)),
norm_f(Conv1d(64, 256, 41, 4, groups=16, padding=20)),
norm_f(Conv1d(256, 1024, 41, 4, groups=64, padding=20)),
norm_f(Conv1d(1024, 1024, 41, 4, groups=256, padding=20)),
norm_f(Conv1d(1024, 1024, 5, 1, padding=2)),
])
self.conv_post = norm_f(Conv1d(1024, 1, 3, 1, padding=1))
def forward(self, x):
fmap = []
for l in self.convs:
x = l(x)
x = F.leaky_relu(x, modules.LRELU_SLOPE)
fmap.append(x)
x = self.conv_post(x)
fmap.append(x)
x = torch.flatten(x, 1, -1)
return x, fmap
class MultiPeriodDiscriminator(torch.nn.Module):
def __init__(self, use_spectral_norm=False):
super(MultiPeriodDiscriminator, self).__init__()
periods = [2,3,5,7,11]
discs = [DiscriminatorS(use_spectral_norm=use_spectral_norm)]
discs = discs + [DiscriminatorP(i, use_spectral_norm=use_spectral_norm) for i in periods]
self.discriminators = nn.ModuleList(discs)
def forward(self, y, y_hat):
y_d_rs = []
y_d_gs = []
fmap_rs = []
fmap_gs = []
for i, d in enumerate(self.discriminators):
y_d_r, fmap_r = d(y)
y_d_g, fmap_g = d(y_hat)
y_d_rs.append(y_d_r)
y_d_gs.append(y_d_g)
fmap_rs.append(fmap_r)
fmap_gs.append(fmap_g)
return y_d_rs, y_d_gs, fmap_rs, fmap_gs
class SpeakerEncoder(torch.nn.Module):
def __init__(self, mel_n_channels=80, model_num_layers=3, model_hidden_size=256, model_embedding_size=256):
super(SpeakerEncoder, self).__init__()
self.lstm = nn.LSTM(mel_n_channels, model_hidden_size, model_num_layers, batch_first=True)
self.linear = nn.Linear(model_hidden_size, model_embedding_size)
self.relu = nn.ReLU()
def forward(self, mels):
self.lstm.flatten_parameters()
_, (hidden, _) = self.lstm(mels)
embeds_raw = self.relu(self.linear(hidden[-1]))
return embeds_raw / torch.norm(embeds_raw, dim=1, keepdim=True)
def compute_partial_slices(self, total_frames, partial_frames, partial_hop):
mel_slices = []
for i in range(0, total_frames-partial_frames, partial_hop):
mel_range = torch.arange(i, i+partial_frames)
mel_slices.append(mel_range)
return mel_slices
def embed_utterance(self, mel, partial_frames=128, partial_hop=64):
mel_len = mel.size(1)
last_mel = mel[:,-partial_frames:]
if mel_len > partial_frames:
mel_slices = self.compute_partial_slices(mel_len, partial_frames, partial_hop)
mels = list(mel[:,s] for s in mel_slices)
mels.append(last_mel)
mels = torch.stack(tuple(mels), 0).squeeze(1)
with torch.no_grad():
partial_embeds = self(mels)
embed = torch.mean(partial_embeds, axis=0).unsqueeze(0)
#embed = embed / torch.linalg.norm(embed, 2)
else:
with torch.no_grad():
embed = self(last_mel)
return embed
class SynthesizerTrn(nn.Module):
"""
Synthesizer for Training
"""
def __init__(self,
spec_channels,
segment_size,
inter_channels,
hidden_channels,
filter_channels,
n_heads,
n_layers,
kernel_size,
p_dropout,
resblock,
resblock_kernel_sizes,
resblock_dilation_sizes,
upsample_rates,
upsample_initial_channel,
upsample_kernel_sizes,
gin_channels,
ssl_dim,
n_speakers,
**kwargs):
super().__init__()
self.spec_channels = spec_channels
self.inter_channels = inter_channels
self.hidden_channels = hidden_channels
self.filter_channels = filter_channels
self.n_heads = n_heads
self.n_layers = n_layers
self.kernel_size = kernel_size
self.p_dropout = p_dropout
self.resblock = resblock
self.resblock_kernel_sizes = resblock_kernel_sizes
self.resblock_dilation_sizes = resblock_dilation_sizes
self.upsample_rates = upsample_rates
self.upsample_initial_channel = upsample_initial_channel
self.upsample_kernel_sizes = upsample_kernel_sizes
self.segment_size = segment_size
self.gin_channels = gin_channels
self.ssl_dim = ssl_dim
self.emb_g = nn.Embedding(n_speakers, gin_channels)
self.enc_p_ = TextEncoder(ssl_dim, inter_channels, hidden_channels, 5, 1, 16,0, filter_channels, n_heads, p_dropout)
hps = {
"sampling_rate": 32000,
"inter_channels": 192,
"resblock": "1",
"resblock_kernel_sizes": [3, 7, 11],
"resblock_dilation_sizes": [[1, 3, 5], [1, 3, 5], [1, 3, 5]],
"upsample_rates": [10, 8, 2, 2],
"upsample_initial_channel": 512,
"upsample_kernel_sizes": [16, 16, 4, 4],
"gin_channels": 256,
}
self.dec = Generator(h=hps)
self.enc_q = Encoder(spec_channels, inter_channels, hidden_channels, 5, 1, 16, gin_channels=gin_channels)
self.flow = ResidualCouplingBlock(inter_channels, hidden_channels, 5, 1, 4, gin_channels=gin_channels)
def forward(self, c, f0, spec, g=None, mel=None, c_lengths=None, spec_lengths=None):
# hubert特征(b,256,frame_num), f0 (frame_num), 幅度谱特征, 说话人id,mel谱特征
if c_lengths == None:
c_lengths = (torch.ones(c.size(0)) * c.size(-1)).to(c.device) # (b, frame_num)
if spec_lengths == None:
spec_lengths = (torch.ones(spec.size(0)) * spec.size(-1)).to(spec.device)
# 说话人信息embding
g = self.emb_g(g).transpose(1,2)
+ # 采样得到的z,vae需要的均值,logs_p是vae需要的log(sigma)
+ # 输入hubert特征(b,256,frame_num), f0 (frame_num),对应的是文本出隐变量的那段模型
z_ptemp, m_p, logs_p, _ = self.enc_p_(c, c_lengths, f0=f0_to_coarse(f0))
+
+ # 输入幅度谱和说话人信息
+ # 输出采样得到的z,m_q是均值,logs_q是log(sigma)
z, m_q, logs_q, spec_mask = self.enc_q(spec, spec_lengths, g=g)
+ # 标准化流,增加分布复杂程度
z_p = self.flow(z, spec_mask, g=g)
+ # 由于整个batch中含有的音频帧数不一致,要求每一个元素都随机裁剪出segment_size长度的特征
+ # 返回z的batch列表,pitch_slice列表和ids_slice的列表
z_slice, pitch_slice, ids_slice = commons.rand_slice_segments_with_pitch(z, f0, spec_lengths, self.segment_size)
# o = self.dec(z_slice, g=g)
+ # 解码部分,输入未经过标准化的z,以及说话人信息和pitch,得到wav波形
o = self.dec(z_slice, g=g, f0=pitch_slice)
+ # 原始波形,批次中每个采样到的帧的位置,批次中幅度谱的有效帧位置,
+ # 幅度谱编码得到正态分布后随机采样得到的z, z经过标准化流之后得到z_p, hubert特征层得到的正态分布的均值,
+ # hubert特征层得到的正态分布的标准差(logs_p),幅度谱和人声信息得到的均值(m_q),幅度谱和人声信息得到的标准差(logs_q)
return o, ids_slice, spec_mask, (z, z_p, m_p, logs_p, m_q, logs_q)
def infer(self, c, f0, g=None, mel=None, c_lengths=None):
if c_lengths == None:
c_lengths = (torch.ones(c.size(0)) * c.size(-1)).to(c.device)
g = self.emb_g(g).transpose(1,2)
z_p, m_p, logs_p, c_mask = self.enc_p_(c, c_lengths, f0=f0_to_coarse(f0))
z = self.flow(z_p, c_mask, g=g, reverse=True)
o = self.dec(z * c_mask, g=g, f0=f0)
return o
+
+ def infer_v1(self, c, spec, f0, g):
+ print(c.shape, spec.shape, f0.shape, g.shape)
+ c_lengths = (torch.ones(c.size(0)) * c.size(-1)).to(c.device) # (b, frame_num)
+ spec_lengths = (torch.ones(spec.size(0)) * spec.size(-1)).to(spec.device)
+ g = self.emb_g(g).transpose(1, 2)
+
+ # z_p, m_p, logs_p, c_mask = self.enc_p_(c, c_lengths, f0=f0_to_coarse(f0))
+ # z = self.flow(z_p, c_mask, g=g, reverse=True)
+ # o = self.dec(z * c_mask, g=g, f0=f0)
+ # print(c_mask.shape, c_mask)
+ z, m_q, logs_q, spec_mask = self.enc_q(spec, spec_lengths, g=g)
+ o = self.dec(z, g=g, f0=f0)
+ return o

File Metadata

Mime Type
text/x-diff
Expires
Sun, Jan 12, 08:30 (1 d, 11 h)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
1347158
Default Alt Text
(33 KB)

Event Timeline