Page MenuHomePhabricator

No OneTemporary

diff --git a/AIMeiSheng/docker_demo/common.py b/AIMeiSheng/docker_demo/common.py
index 3eff425..25d701e 100644
--- a/AIMeiSheng/docker_demo/common.py
+++ b/AIMeiSheng/docker_demo/common.py
@@ -1,61 +1,93 @@
import os
import time
import logging
import urllib, urllib.request
gs_tmp_dir = "/tmp/ai_meisheng_tmp"
gs_model_dir = "/tmp/ai_meisheng_models"
gs_resource_cache_dir = "/tmp/ai_meisheng_resource_cache"
gs_svc_model_path = os.path.join(gs_model_dir,
"weights/xusong_v2_org_version_alldata_embed1_enzx_diff_fi_e15_s244110.pth")
gs_embed_model_path = os.path.join(gs_model_dir, "RawNet3/models/weights/model.pt")
gs_hubert_model_path = os.path.join(gs_model_dir, "hubert.pt")
gs_rmvpe_model_path = os.path.join(gs_model_dir, "rmvpe.pt")
+# errcode
+gs_err_code_success = 0
+gs_err_code_download_vocal = 100
+gs_err_code_download_svc_url = 101
+gs_err_code_svc_process = 102
+gs_err_code_transcode = 103
+gs_err_code_volume_adjust = 104
+gs_err_code_upload = 105
+gs_err_code_params = 106
+gs_err_code_pending = 107
+gs_err_code_too_many_connections = 429
+
+gs_redis_conf = {
+ "host": "av-credis.starmaker.co",
+ "port": 6379,
+ "pwd": "lKoWEhz%jxTO",
+}
+
+gs_server_redis_conf = {
+ "producer": "ai_meisheng_producer", # 输入的队列
+ "ai_meisheng_key_prefix": "ai_meisheng_key_", # 存储结果情况
+}
+
def download2disk(url, dst_path):
st = time.time()
urllib.request.urlretrieve(url, dst_path)
print(f"download {url} -> {dst_path} sp = {time.time() - st}")
return os.path.exists(dst_path)
def exec_cmd(cmd):
# gs_logger.info(cmd)
print(cmd)
ret = os.system(cmd)
if ret != 0:
return False
return True
def exec_cmd_and_result(cmd):
r = os.popen(cmd)
text = r.read()
r.close()
return text
def upload_file2cos(key, file_path, region='ap-singapore', bucket_name='av-audit-sync-sg-1256122840'):
"""
将文件上传到cos
:param key: 桶上的具体地址
:param file_path: 本地文件地址
:param region: 区域
:param bucket_name: 桶地址
:return:
"""
gs_coscmd = "coscmd"
gs_coscmd_conf = "~/.cos.conf"
cmd = "{} -c {} -r {} -b {} upload {} {}".format(gs_coscmd, gs_coscmd_conf, region, bucket_name, file_path, key)
if exec_cmd(cmd):
cmd = "{} -c {} -r {} -b {} info {}".format(gs_coscmd, gs_coscmd_conf, region, bucket_name, key) \
+ "| grep Content-Length |awk \'{print $2}\'"
res_str = exec_cmd_and_result(cmd)
logging.info("{},res={}".format(key, res_str))
size = float(res_str)
if size > 0:
return True
return False
return False
+
+
+def check_input(input_data):
+ key_list = ["record_song_url", "target_url", "start", "end", "vocal_loudness", "female_recording_url",
+ "male_recording_url"]
+ for key in key_list:
+ if key not in input_data.keys():
+ return False
+ return True
diff --git a/AIMeiSheng/docker_demo/http_server.py b/AIMeiSheng/docker_demo/http_server.py
index a943980..ffa160e 100644
--- a/AIMeiSheng/docker_demo/http_server.py
+++ b/AIMeiSheng/docker_demo/http_server.py
@@ -1,84 +1,84 @@
# -*- coding: UTF-8 -*-
"""
SVC处理逻辑
1. 根据跟定的vocal_url 判别男女
2. 根据男女信息选择适合的男女url
3. 模型推理
"""
import gc
import os
import sys
+
+sys.path.append(os.path.dirname(__file__))
+sys.path.append(os.path.join(os.path.dirname(__file__), "../"))
+
import json
import time
import socket
import logging
import hashlib
from flask import Flask, jsonify, request, abort
from redis_helper import RedisHelper
-from offline_server import (gs_server_redis_conf, gs_redis_conf, check_input, gs_err_code_pending,
- gs_err_code_params, gs_err_code_too_many_connections)
-
-sys.path.append(os.path.dirname(__file__))
-sys.path.append(os.path.join(os.path.dirname(__file__), "../"))
+from common import *
# 全局设置
hostname = socket.gethostname()
log_file_name = f"/tmp/av_meisheng_http_{hostname}.log"
logging.basicConfig(filename=log_file_name, format='%(asctime)s %(levelname)s %(message)s', datefmt='%Y-%m-%d %I:%M:%S',
level=logging.INFO)
app = Flask(__name__)
class HttpServer:
def __init__(self, redis_conf, server_conf):
self.redis_helper = RedisHelper(redis_conf)
self.server_conf = server_conf
def process(self, in_data):
msg = {
"status": gs_err_code_params,
"schedule": 100,
"gender": "unknown",
"target_song_url": "",
}
if not check_input(in_data):
return msg
if self.redis_helper.llen(self.server_conf["producer"]) > 10:
msg["status"] = gs_err_code_too_many_connections
return msg
distinct_id = hashlib.md5(in_data["record_song_url"].encode()).hexdigest()
distinct_key = self.server_conf["ai_meisheng_key_prefix"] + distinct_id
if not self.redis_helper.exists(distinct_key):
msg["status"] = gs_err_code_pending
self.redis_helper.set(distinct_key, json.dumps(msg))
self.redis_helper.lpush(self.server_conf["producer"], json.dumps(in_data))
self.redis_helper.expire(distinct_key, 15)
msg = self.redis_helper.get(distinct_key)
return json.loads(msg)
gs_http_server = HttpServer(gs_redis_conf, gs_server_redis_conf)
@app.route("/ai_meisheng", methods=["POST"])
def ai_meisheng():
data = request.json
st = time.time()
logging.info(f"ai_meisheng:in:{data}")
msg = gs_http_server.process(data)
json_msg = jsonify(msg)
logging.info(f"ai_meisheng:out:{data}-{json_msg}, sp={time.time() - st}")
return json_msg
if __name__ == "__main__":
app.run(host='0.0.0.0', port=5000, threaded=False)
diff --git a/AIMeiSheng/docker_demo/offline_server.py b/AIMeiSheng/docker_demo/offline_server.py
index 134f7f9..8aff81d 100644
--- a/AIMeiSheng/docker_demo/offline_server.py
+++ b/AIMeiSheng/docker_demo/offline_server.py
@@ -1,189 +1,157 @@
# -*- coding: UTF-8 -*-
"""
离线处理: 使用redis进行交互,从redis中获取数据资源,在将结果写入到redis
"""
import os
import sys
import time
import json
import socket
import hashlib
import logging
from redis_helper import RedisHelper
-from common import download2disk, exec_cmd, upload_file2cos
+from common import *
from svc_online import GSWorkerAttr, SVCOnline, volume_adjustment
hostname = socket.gethostname()
log_file_name = f"/tmp/av_meisheng_{hostname}.log"
logging.basicConfig(filename=log_file_name, format='%(asctime)s %(levelname)s %(message)s', datefmt='%Y-%m-%d %I:%M:%S',
level=logging.INFO)
-# errcode
-gs_err_code_success = 0
-gs_err_code_download_vocal = 100
-gs_err_code_download_svc_url = 101
-gs_err_code_svc_process = 102
-gs_err_code_transcode = 103
-gs_err_code_volume_adjust = 104
-gs_err_code_upload = 105
-gs_err_code_params = 106
-gs_err_code_pending = 107
-gs_err_code_too_many_connections = 429
-
sys.path.append(os.path.dirname(__file__))
sys.path.append(os.path.join(os.path.dirname(__file__), "../"))
-gs_redis_conf = {
- "host": "av-credis.starmaker.co",
- "port": 6379,
- "pwd": "lKoWEhz%jxTO",
-}
-
-gs_server_redis_conf = {
- "producer": "ai_meisheng_producer", # 输入的队列
- "ai_meisheng_key_prefix": "ai_meisheng_key_", # 存储结果情况
-}
-
def download_data(worker_attr):
vocal_path = os.path.join(worker_attr.tmp_dir, worker_attr.distinct_id)
if os.path.exists(vocal_path):
os.remove(vocal_path)
st = time.time()
if not download2disk(worker_attr.vocal_url, worker_attr.vocal_path):
return gs_err_code_download_vocal
logging.info(f"download vocal_url={worker_attr.vocal_url} sp = {time.time() - st}")
# download svc_source_url
if not os.path.exists(worker_attr.female_svc_source_path):
st = time.time()
if not download2disk(worker_attr.female_svc_source_url, worker_attr.female_svc_source_path):
return gs_err_code_download_svc_url
logging.info(f"download female_url={worker_attr.female_svc_source_url} sp = {time.time() - st}")
# download svc_source_url
if not os.path.exists(worker_attr.male_svc_source_path):
st = time.time()
if not download2disk(worker_attr.male_svc_source_url, worker_attr.male_svc_source_path):
return gs_err_code_download_svc_url
logging.info(f"download male_url={worker_attr.male_svc_source_url} sp = {time.time() - st}")
return gs_err_code_success
def transcode(wav_path, dst_path):
st = time.time()
cmd = f"ffmpeg -i {wav_path} -ar 44100 -ac 2 -b:a 64k -y {dst_path} -loglevel fatal"
exec_cmd(cmd)
logging.info(f"transcode cmd={cmd}, sp = {time.time() - st}")
return os.path.exists(dst_path)
-def check_input(input_data):
- key_list = ["record_song_url", "target_url", "start", "end", "vocal_loudness", "female_recording_url",
- "male_recording_url"]
- for key in key_list:
- if key not in input_data.keys():
- return False
- return True
-
-
class OfflineServer:
def __init__(self, redis_conf, server_conf, update_redis=False):
self.redis_helper = RedisHelper(redis_conf)
self.svc_online = SVCOnline()
self.server_conf = server_conf
self.distinct_key = server_conf["ai_meisheng_key_prefix"]
self.update_redis = update_redis
def exists(self):
return self.redis_helper.exists(self.distinct_key)
def update_result(self, errcode, schedule, gender, target_song_url):
msg = {
"status": errcode,
"schedule": schedule,
"gender": gender,
"target_song_url": target_song_url,
}
# 结果保存15min
if self.update_redis:
self.redis_helper.set(self.distinct_key, json.dumps(msg))
self.redis_helper.expire(self.distinct_key, 60 * 10)
def process_one(self, worker_attr):
self.distinct_key = self.server_conf["ai_meisheng_key_prefix"] + worker_attr.distinct_id
logging.info(f"{worker_attr.log_info_name()}, start download ...")
err = download_data(worker_attr)
if err != gs_err_code_success:
self.update_result(err, 100, "unknown", worker_attr.target_url)
return err, None, None
self.update_result(err, 35, "unknown", worker_attr.target_url)
logging.info(f"{worker_attr.log_info_name()}, start process ...")
gender = self.svc_online.process(worker_attr)
if not os.path.exists(worker_attr.target_wav_path):
self.update_result(gs_err_code_svc_process, 100, gender, worker_attr.target_url)
return gs_err_code_svc_process, None, None
self.update_result(err, 85, gender, worker_attr.target_url)
# 音量拉伸到指定响度
logging.info(f"{worker_attr.log_info_name()}, start volume_adjustment ...")
volume_adjustment(worker_attr.target_wav_path, worker_attr.target_loudness, worker_attr.target_wav_ad_path)
if not os.path.exists(worker_attr.target_wav_ad_path):
self.update_result(gs_err_code_volume_adjust, 100, gender, worker_attr.target_url)
return gs_err_code_volume_adjust, None, None
self.update_result(err, 90, gender, worker_attr.target_url)
# transcode
logging.info(f"{worker_attr.log_info_name()}, start transcode ...")
if not transcode(worker_attr.target_wav_path, worker_attr.target_path):
self.update_result(gs_err_code_transcode, 100, gender, worker_attr.target_url)
return gs_err_code_transcode, None, None
self.update_result(err, 95, gender, worker_attr.target_url)
# upload
logging.info(f"{worker_attr.log_info_name()}, start upload_file2cos ...")
st = time.time()
# 从target_url 分离出bucket_name,ap,和key
# "http://starmaker-sv-1256122840.cos.na-siliconvalley.myqcloud.com/production/ai_voice/7036874317774285/xxalkdjfladjflkasdf-target.mp4",
bucket_name = worker_attr.target_url.split(".")[0].split("//")[-1]
region = worker_attr.target_url.split(".")[2]
key = "/".join(worker_attr.target_url.split("/")[3:])
logging.info(f"{worker_attr.log_info_name()}, start upload_file2cos {bucket_name}, {region}, {key}")
if not upload_file2cos(key, worker_attr.target_path, region=region, bucket_name=bucket_name):
self.update_result(gs_err_code_upload, 100, gender, worker_attr.target_url)
return gs_err_code_upload, None, None
self.update_result(gs_err_code_success, 100, gender, worker_attr.target_url)
logging.info(f"{worker_attr.log_info_name()} upload {worker_attr.target_url} sp = {time.time() - st}")
return gs_err_code_success, worker_attr.target_url, gender
def process(self):
while True:
data = self.redis_helper.rpop(self.server_conf["producer"])
if data is None:
time.sleep(1)
continue
data = json.loads(data)
if not check_input(data):
logging.error(f"input data error={data}")
continue
worker_attr = GSWorkerAttr(data)
self.distinct_key = self.server_conf["ai_meisheng_key_prefix"] + worker_attr.distinct_id
if not self.exists():
logging.warning(f"input {data}, timeout abandon ....")
continue
st = time.time()
errcode, target_path, gender = self.process_one(worker_attr)
self.update_result(errcode, 100, gender, target_path)
logging.info(f"{worker_attr.log_info_name()} finish sp = {time.time() - st}")
if __name__ == '__main__':
offline_server = OfflineServer(gs_redis_conf, gs_server_redis_conf, True)
offline_server.process()

File Metadata

Mime Type
text/x-diff
Expires
Sun, Jan 12, 08:33 (1 d, 15 h)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
1347196
Default Alt Text
(14 KB)

Event Timeline