Page Menu
Home
Phabricator
Search
Configure Global Search
Log In
Files
F4880361
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
20 KB
Subscribers
None
View Options
diff --git a/AIMeiSheng/docker_demo/common.py b/AIMeiSheng/docker_demo/common.py
index 64aba31..2de53e1 100644
--- a/AIMeiSheng/docker_demo/common.py
+++ b/AIMeiSheng/docker_demo/common.py
@@ -1,122 +1,128 @@
import os
import sys
import time
# import logging
import urllib, urllib.request
# 测试/正式环境
gs_prod = True
# if len(sys.argv) > 1 and sys.argv[1] == "prod":
# gs_prod = True
# print(gs_prod)
gs_tmp_dir = "/tmp/ai_meisheng_tmp"
gs_model_dir = "/tmp/ai_meisheng_models"
gs_resource_cache_dir = "/tmp/ai_meisheng_resource_cache"
gs_embed_model_path = os.path.join(gs_model_dir, "RawNet3/models/weights/model.pt")
gs_svc_model_path = os.path.join(gs_model_dir,
"weights/xusong_v2_org_version_alldata_embed_spkenx200x_double_e14_s90706.pth")
gs_hubert_model_path = os.path.join(gs_model_dir, "hubert.pt")
gs_rmvpe_model_path = os.path.join(gs_model_dir, "rmvpe.pt")
gs_embed_model_spk_path = os.path.join(gs_model_dir, "SpeakerEncoder/pretrained_model/best_model.pth.tar")
gs_embed_config_spk_path = os.path.join(gs_model_dir, "SpeakerEncoder/pretrained_model/config.json")
# errcode
gs_err_code_success = 0
gs_err_code_download_vocal = 100
gs_err_code_download_svc_url = 101
gs_err_code_svc_process = 102
gs_err_code_transcode = 103
gs_err_code_volume_adjust = 104
gs_err_code_upload = 105
gs_err_code_params = 106
gs_err_code_pending = 107
gs_err_code_target_silence = 108
gs_err_code_too_many_connections = 429
gs_err_code_gender_classify = 430
gs_redis_conf = {
"host": "av-credis.starmaker.co",
"port": 6379,
"pwd": "lKoWEhz%jxTO",
}
+# gs_server_redis_conf = {
+# "producer": "dev_ai_meisheng_producer", # 输入的队列
+# "ai_meisheng_key_prefix": "dev_ai_meisheng_key_", # 存储结果情况
+# }
+
gs_server_redis_conf = {
"producer": "test_ai_meisheng_producer", # 输入的队列
"ai_meisheng_key_prefix": "test_ai_meisheng_key_", # 存储结果情况
}
if gs_prod:
gs_server_redis_conf = {
"producer": "ai_meisheng_producer", # 输入的队列
"ai_meisheng_key_prefix": "ai_meisheng_key_", # 存储结果情况
}
gs_feishu_conf = {
"url": "http://sg-prod-songbook-webmp-1:8000/api/feishu/people",
"users": [
"18810833785", # 杨建利
"17778007843", # 王健军
- "18612496315" # 郭子豪
+ "18612496315", # 郭子豪
+ "18600542290" # 方兵晓
]
}
def download2disk(url, dst_path):
try:
urllib.request.urlretrieve(url, dst_path)
return os.path.exists(dst_path)
except Exception as ex:
print(f"download url={url} error", ex)
return False
def exec_cmd(cmd):
# gs_logger.info(cmd)
print(cmd)
ret = os.system(cmd)
if ret != 0:
return False
return True
def exec_cmd_and_result(cmd):
r = os.popen(cmd)
text = r.read()
r.close()
return text
def upload_file2cos(key, file_path, region='ap-singapore', bucket_name='av-audit-sync-sg-1256122840'):
"""
将文件上传到cos
:param key: 桶上的具体地址
:param file_path: 本地文件地址
:param region: 区域
:param bucket_name: 桶地址
:return:
"""
gs_coscmd = "coscmd"
gs_coscmd_conf = "~/.cos.conf"
cmd = "{} -c {} -r {} -b {} upload {} {}".format(gs_coscmd, gs_coscmd_conf, region, bucket_name, file_path, key)
if exec_cmd(cmd):
cmd = "{} -c {} -r {} -b {} info {}".format(gs_coscmd, gs_coscmd_conf, region, bucket_name, key) \
+ "| grep Content-Length |awk \'{print $2}\'"
res_str = exec_cmd_and_result(cmd)
# logging.info("{},res={}".format(key, res_str))
size = float(res_str)
if size > 0:
return True
return False
return False
def check_input(input_data):
key_list = ["record_song_url", "target_url", "start", "end", "vocal_loudness", "female_recording_url",
"male_recording_url"]
for key in key_list:
if key not in input_data.keys():
return False
return True
diff --git a/AIMeiSheng/docker_demo/offline_server.py b/AIMeiSheng/docker_demo/offline_server.py
index 7e5ea58..f92592e 100644
--- a/AIMeiSheng/docker_demo/offline_server.py
+++ b/AIMeiSheng/docker_demo/offline_server.py
@@ -1,154 +1,166 @@
# -*- coding: UTF-8 -*-
"""
离线处理: 使用redis进行交互,从redis中获取数据资源,在将结果写入到redis
"""
import os
import sys
import time
import json
import socket
import hashlib
from redis_helper import RedisHelper
from cos_helper import CosHelper
from common import *
import logging
+from feishu_helper import feishu_send
from svc_online import GSWorkerAttr, SVCOnline, volume_adjustment, svc_offline_logger
sys.path.append(os.path.dirname(__file__))
sys.path.append(os.path.join(os.path.dirname(__file__), "../"))
def download_data(worker_attr):
if os.path.exists(worker_attr.vocal_path):
os.unlink(worker_attr.vocal_path)
st = time.time()
if not download2disk(worker_attr.vocal_url, worker_attr.vocal_path):
return gs_err_code_download_vocal
svc_offline_logger.info(f"download vocal_url={worker_attr.vocal_url} sp = {time.time() - st}")
# download svc_source_url
if not os.path.exists(worker_attr.female_svc_source_path):
st = time.time()
if not download2disk(worker_attr.female_svc_source_url, worker_attr.female_svc_source_path):
return gs_err_code_download_svc_url
svc_offline_logger.info(f"download female_url={worker_attr.female_svc_source_url} sp = {time.time() - st}")
# download svc_source_url
if not os.path.exists(worker_attr.male_svc_source_path):
st = time.time()
if not download2disk(worker_attr.male_svc_source_url, worker_attr.male_svc_source_path):
return gs_err_code_download_svc_url
svc_offline_logger.info(f"download male_url={worker_attr.male_svc_source_url} sp = {time.time() - st}")
return gs_err_code_success
def transcode(wav_path, dst_path):
st = time.time()
cmd = f"ffmpeg -i {wav_path} -ar 44100 -ac 1 -b:a 64k -y {dst_path} -loglevel fatal"
exec_cmd(cmd)
svc_offline_logger.info(f"transcode cmd={cmd}, sp = {time.time() - st}")
return os.path.exists(dst_path)
class OfflineServer:
def __init__(self, redis_conf, server_conf, update_redis=False):
st = time.time()
self.redis_helper = RedisHelper(redis_conf)
self.cos_helper = CosHelper()
self.svc_online = SVCOnline()
self.server_conf = server_conf
self.distinct_key = server_conf["ai_meisheng_key_prefix"]
self.update_redis = update_redis
svc_offline_logger.info(f"config={redis_conf}---server_conf={self.server_conf}")
svc_offline_logger.info(f"offline init finish sp={time.time() - st}")
def exists(self):
return self.redis_helper.exists(self.distinct_key)
def update_result(self, errcode, schedule, gender, target_song_url):
msg = {
"status": errcode,
"schedule": schedule,
"gender": gender,
"target_song_url": target_song_url,
}
# 结果保存15min
if self.update_redis:
self.redis_helper.set(self.distinct_key, json.dumps(msg))
self.redis_helper.expire(self.distinct_key, 60 * 10)
def process_one(self, worker_attr):
self.distinct_key = self.server_conf["ai_meisheng_key_prefix"] + worker_attr.distinct_id
svc_offline_logger.info(f"{worker_attr.log_info_name()}, start download ...")
err = download_data(worker_attr)
if err != gs_err_code_success:
self.update_result(err, 100, "unknown", worker_attr.target_url)
return err, None, None
self.update_result(err, 35, "unknown", worker_attr.target_url)
svc_offline_logger.info(f"{worker_attr.log_info_name()}, start process ...")
gender, err_code = self.svc_online.process(worker_attr)
if err_code == gs_err_code_target_silence: # unvoice err
return gs_err_code_target_silence, None, None
if not os.path.exists(worker_attr.target_wav_path):
self.update_result(gs_err_code_svc_process, 100, gender, worker_attr.target_url)
return gs_err_code_svc_process, None, None
self.update_result(err, 85, gender, worker_attr.target_url)
# 音量拉伸到指定响度
svc_offline_logger.info(f"{worker_attr.log_info_name()}, start volume_adjustment ...")
volume_adjustment(worker_attr.target_wav_path, worker_attr.target_loudness, worker_attr.target_wav_ad_path)
if not os.path.exists(worker_attr.target_wav_ad_path):
self.update_result(gs_err_code_volume_adjust, 100, gender, worker_attr.target_url)
return gs_err_code_volume_adjust, None, None
self.update_result(err, 90, gender, worker_attr.target_url)
# transcode
svc_offline_logger.info(f"{worker_attr.log_info_name()}, start transcode ...")
if not transcode(worker_attr.target_wav_ad_path, worker_attr.target_path):
self.update_result(gs_err_code_transcode, 100, gender, worker_attr.target_url)
return gs_err_code_transcode, None, None
self.update_result(err, 95, gender, worker_attr.target_url)
# upload
svc_offline_logger.info(f"{worker_attr.log_info_name()}, start upload_file2cos ...")
st = time.time()
if not self.cos_helper.upload_by_url(worker_attr.target_path, worker_attr.target_url):
self.update_result(gs_err_code_upload, 100, gender, worker_attr.target_url)
return gs_err_code_upload, None, None
self.update_result(gs_err_code_success, 100, gender, worker_attr.target_url)
svc_offline_logger.info(
f"{worker_attr.log_info_name()} upload {worker_attr.target_url} sp = {time.time() - st}")
return gs_err_code_success, worker_attr.target_url, gender
+ def start_signal(self):
+ """
+ 程序启动,发送消息到飞书
+ :return:
+ """
+ host_name = socket.gethostname()
+ host_ip = socket.gethostbyname(host_name)
+ msg = f"{host_name}({host_ip}) offline server start"
+ feishu_send(gs_feishu_conf["url"], msg, gs_feishu_conf["users"])
+
def process(self):
+ self.start_signal()
while True:
data = self.redis_helper.rpop(self.server_conf["producer"])
if data is None:
time.sleep(1)
continue
data = json.loads(data)
if not check_input(data):
svc_offline_logger.error(f"input data error={data}")
continue
worker_attr = GSWorkerAttr(data)
self.distinct_key = self.server_conf["ai_meisheng_key_prefix"] + worker_attr.distinct_id
if not self.exists():
svc_offline_logger.warning(f"input {data}, timeout abandon ....")
worker_attr.rm_cache()
continue
st = time.time()
errcode, target_path, gender = self.process_one(worker_attr)
self.update_result(errcode, 100, gender, target_path)
svc_offline_logger.info(f"{worker_attr.log_info_name()} finish errcode={errcode} sp = {time.time() - st}")
worker_attr.rm_cache()
if __name__ == '__main__':
offline_server = OfflineServer(gs_redis_conf, gs_server_redis_conf, True)
offline_server.process()
diff --git a/AIMeiSheng/docker_demo/svc_online.py b/AIMeiSheng/docker_demo/svc_online.py
index a04ca83..1606703 100644
--- a/AIMeiSheng/docker_demo/svc_online.py
+++ b/AIMeiSheng/docker_demo/svc_online.py
@@ -1,194 +1,197 @@
# -*- coding: UTF-8 -*-
"""
SVC的核心处理逻辑
"""
import os
import time
import socket
import shutil
import hashlib
from AIMeiSheng.meisheng_svc_final import load_model, process_svc_online
from AIMeiSheng.cos_similar_ui_zoom import cos_similar
from AIMeiSheng.meisheng_env_preparex import meisheng_env_prepare
from AIMeiSheng.voice_classification.online.voice_class_online_fang import VoiceClass, download_volume_balanced
from AIMeiSheng.docker_demo.common import *
import logging
hostname = socket.gethostname()
log_file_name = f"{os.path.dirname(os.path.abspath(__file__))}/av_meisheng_{hostname}.log"
# 设置logger
svc_offline_logger = logging.getLogger("svc_offline")
file_handler = logging.FileHandler(log_file_name)
file_handler.setLevel(logging.INFO)
formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s', datefmt='%Y-%m-%d %I:%M:%S')
file_handler.setFormatter(formatter)
# if gs_prod:
# svc_offline_logger.addHandler(file_handler)
if os.path.exists(gs_tmp_dir):
shutil.rmtree(gs_tmp_dir)
os.makedirs(gs_model_dir, exist_ok=True)
os.makedirs(gs_resource_cache_dir, exist_ok=True)
# 预设参数
gs_gender_models_url = "https://av-audit-sync-sg-1256122840.cos.ap-singapore.myqcloud.com/hub/voice_classification/models.zip"
gs_volume_bin_url = "https://av-audit-sync-sg-1256122840.cos.ap-singapore.myqcloud.com/dataset/AIMeiSheng/ebur128_tool/v1/ebur128_tool"
class GSWorkerAttr:
def __init__(self, input_data):
# 取出输入资源
vocal_url = input_data["record_song_url"]
target_url = input_data["target_url"]
start = input_data["start"] # 单位是ms
end = input_data["end"] # 单位是ms
vocal_loudness = input_data["vocal_loudness"]
female_recording_url = input_data["female_recording_url"]
male_recording_url = input_data["male_recording_url"]
self.distinct_id = hashlib.md5(vocal_url.encode()).hexdigest()
self.tmp_dir = os.path.join(gs_tmp_dir, self.distinct_id)
if os.path.exists(self.tmp_dir):
shutil.rmtree(self.tmp_dir)
os.makedirs(self.tmp_dir)
self.vocal_url = vocal_url
self.target_url = target_url
ext = vocal_url.split(".")[-1]
self.vocal_path = os.path.join(self.tmp_dir, self.distinct_id + f"_in.{ext}")
self.target_wav_path = os.path.join(self.tmp_dir, self.distinct_id + "_out.wav")
self.target_wav_ad_path = os.path.join(self.tmp_dir, self.distinct_id + "_out_ad.wav")
self.target_path = os.path.join(self.tmp_dir, self.distinct_id + "_out.m4a")
self.female_svc_source_url = female_recording_url
self.male_svc_source_url = male_recording_url
ext = female_recording_url.split(".")[-1]
- self.female_svc_source_path = os.path.join(gs_resource_cache_dir,
- hashlib.md5(female_recording_url.encode()).hexdigest() + "." + ext)
+ self.female_svc_source_path = os.path.join(self.tmp_dir, self.distinct_id + f"_female.{ext}")
ext = male_recording_url.split(".")[-1]
- self.male_svc_source_path = os.path.join(gs_resource_cache_dir,
- hashlib.md5(male_recording_url.encode()).hexdigest() + "." + ext)
+ self.male_svc_source_path = os.path.join(self.tmp_dir, self.distinct_id + f"_male.{ext}")
+ # self.female_svc_source_path = os.path.join(gs_resource_cache_dir,
+ # hashlib.md5(female_recording_url.encode()).hexdigest() + "." + ext)
+ # ext = male_recording_url.split(".")[-1]
+ # self.male_svc_source_path = os.path.join(gs_resource_cache_dir,
+ # hashlib.md5(male_recording_url.encode()).hexdigest() + "." + ext)
self.st_tm = start
self.ed_tm = end
self.target_loudness = vocal_loudness
def log_info_name(self):
return f"d_id={self.distinct_id}, vocal_url={self.vocal_url}"
def rm_cache(self):
if os.path.exists(self.tmp_dir):
shutil.rmtree(self.tmp_dir)
def init_gender_model():
"""
下载模型
:return:
"""
dst_model_dir = os.path.join(gs_model_dir, "voice_classification")
if not os.path.exists(dst_model_dir):
dst_zip_path = os.path.join(gs_model_dir, "models.zip")
if not download2disk(gs_gender_models_url, dst_zip_path):
svc_offline_logger.fatal(f"download gender_model err={gs_gender_models_url}")
cmd = f"cd {gs_model_dir}; unzip {dst_zip_path}; mv models voice_classification; rm -f {dst_zip_path}"
os.system(cmd)
if not os.path.exists(dst_model_dir):
svc_offline_logger.fatal(f"unzip {dst_zip_path} err")
music_voice_pure_model = os.path.join(dst_model_dir, "voice_005_rec_v5.pth")
music_voice_no_pure_model = os.path.join(dst_model_dir, "voice_10_v5.pth")
gender_pure_model = os.path.join(dst_model_dir, "gender_8k_ratev5_v6_adam.pth")
gender_no_pure_model = os.path.join(dst_model_dir, "gender_8k_v6_adam.pth")
vc = VoiceClass(music_voice_pure_model, music_voice_no_pure_model, gender_pure_model, gender_no_pure_model)
return vc
def init_svc_model():
meisheng_env_prepare(logging, gs_model_dir)
embed_model, hubert_model = load_model()
cs_sim = cos_similar()
return embed_model, hubert_model, cs_sim
def download_volume_adjustment():
"""
下载音量调整工具
:return:
"""
volume_bin_path = os.path.join(gs_model_dir, "ebur128_tool")
if not os.path.exists(volume_bin_path):
if not download2disk(gs_volume_bin_url, volume_bin_path):
svc_offline_logger.fatal(f"download volume_bin err={gs_volume_bin_url}")
os.system(f"chmod +x {volume_bin_path}")
def volume_adjustment(wav_path, target_loudness, out_path):
"""
音量调整
:param wav_path:
:param target_loudness:
:param out_path:
:return:
"""
volume_bin_path = os.path.join(gs_model_dir, "ebur128_tool")
cmd = f"{volume_bin_path} {wav_path} {target_loudness} {out_path}"
os.system(cmd)
class SVCOnline:
def __init__(self):
st = time.time()
self.gender_model = init_gender_model()
self.embed_model, self.hubert_model, self.cs_sim = init_svc_model()
download_volume_adjustment()
download_volume_balanced()
svc_offline_logger.info(f"svc init finished, sp = {time.time() - st}")
def gender_process(self, worker_attr):
st = time.time()
gender, female_rate, is_pure = self.gender_model.process(worker_attr.vocal_path)
svc_offline_logger.info(
f"{worker_attr.vocal_url}, gender={gender}, female_rate={female_rate}, is_pure={is_pure}, "
f"gender_process sp = {time.time() - st}")
if gender == 0:
gender = 'female'
elif gender == 1:
gender = 'male'
elif female_rate == None:
gender = 'male'
return gender, gs_err_code_gender_classify
elif female_rate > 0.5:
gender = 'female'
else:
gender = 'male'
svc_offline_logger.info(f"{worker_attr.vocal_url}, modified gender={gender}")
# err = gs_err_code_success
# if female_rate == -1:
# err = gs_err_code_target_silence
return gender, gs_err_code_success
def process(self, worker_attr):
gender, err = self.gender_process(worker_attr)
if err != gs_err_code_success:
return gender, err
song_path = worker_attr.female_svc_source_path
if gender == "male":
song_path = worker_attr.male_svc_source_path
params = {'gender': gender, 'tst': worker_attr.st_tm, "tnd": worker_attr.ed_tm, 'delay': 0, 'song_path': None}
st = time.time()
err_code = process_svc_online(song_path, worker_attr.vocal_path, worker_attr.target_wav_path, self.embed_model,
self.hubert_model, self.cs_sim, params)
svc_offline_logger.info(f"{worker_attr.vocal_url}, err_code={err_code} process svc sp = {time.time() - st}")
return gender, err_code
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Sun, Jan 12, 08:35 (1 d, 11 h)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
1347223
Default Alt Text
(20 KB)
Attached To
R350 av_svc
Event Timeline
Log In to Comment