Page Menu
Home
Phabricator
Search
Configure Global Search
Log In
Files
F4880328
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
14 KB
Subscribers
None
View Options
diff --git a/AIMeiSheng/docker_demo/common.py b/AIMeiSheng/docker_demo/common.py
index 3eff425..25d701e 100644
--- a/AIMeiSheng/docker_demo/common.py
+++ b/AIMeiSheng/docker_demo/common.py
@@ -1,61 +1,93 @@
import os
import time
import logging
import urllib, urllib.request
gs_tmp_dir = "/tmp/ai_meisheng_tmp"
gs_model_dir = "/tmp/ai_meisheng_models"
gs_resource_cache_dir = "/tmp/ai_meisheng_resource_cache"
gs_svc_model_path = os.path.join(gs_model_dir,
"weights/xusong_v2_org_version_alldata_embed1_enzx_diff_fi_e15_s244110.pth")
gs_embed_model_path = os.path.join(gs_model_dir, "RawNet3/models/weights/model.pt")
gs_hubert_model_path = os.path.join(gs_model_dir, "hubert.pt")
gs_rmvpe_model_path = os.path.join(gs_model_dir, "rmvpe.pt")
+# errcode
+gs_err_code_success = 0
+gs_err_code_download_vocal = 100
+gs_err_code_download_svc_url = 101
+gs_err_code_svc_process = 102
+gs_err_code_transcode = 103
+gs_err_code_volume_adjust = 104
+gs_err_code_upload = 105
+gs_err_code_params = 106
+gs_err_code_pending = 107
+gs_err_code_too_many_connections = 429
+
+gs_redis_conf = {
+ "host": "av-credis.starmaker.co",
+ "port": 6379,
+ "pwd": "lKoWEhz%jxTO",
+}
+
+gs_server_redis_conf = {
+ "producer": "ai_meisheng_producer", # 输入的队列
+ "ai_meisheng_key_prefix": "ai_meisheng_key_", # 存储结果情况
+}
+
def download2disk(url, dst_path):
st = time.time()
urllib.request.urlretrieve(url, dst_path)
print(f"download {url} -> {dst_path} sp = {time.time() - st}")
return os.path.exists(dst_path)
def exec_cmd(cmd):
# gs_logger.info(cmd)
print(cmd)
ret = os.system(cmd)
if ret != 0:
return False
return True
def exec_cmd_and_result(cmd):
r = os.popen(cmd)
text = r.read()
r.close()
return text
def upload_file2cos(key, file_path, region='ap-singapore', bucket_name='av-audit-sync-sg-1256122840'):
"""
将文件上传到cos
:param key: 桶上的具体地址
:param file_path: 本地文件地址
:param region: 区域
:param bucket_name: 桶地址
:return:
"""
gs_coscmd = "coscmd"
gs_coscmd_conf = "~/.cos.conf"
cmd = "{} -c {} -r {} -b {} upload {} {}".format(gs_coscmd, gs_coscmd_conf, region, bucket_name, file_path, key)
if exec_cmd(cmd):
cmd = "{} -c {} -r {} -b {} info {}".format(gs_coscmd, gs_coscmd_conf, region, bucket_name, key) \
+ "| grep Content-Length |awk \'{print $2}\'"
res_str = exec_cmd_and_result(cmd)
logging.info("{},res={}".format(key, res_str))
size = float(res_str)
if size > 0:
return True
return False
return False
+
+
+def check_input(input_data):
+ key_list = ["record_song_url", "target_url", "start", "end", "vocal_loudness", "female_recording_url",
+ "male_recording_url"]
+ for key in key_list:
+ if key not in input_data.keys():
+ return False
+ return True
diff --git a/AIMeiSheng/docker_demo/http_server.py b/AIMeiSheng/docker_demo/http_server.py
index a943980..ffa160e 100644
--- a/AIMeiSheng/docker_demo/http_server.py
+++ b/AIMeiSheng/docker_demo/http_server.py
@@ -1,84 +1,84 @@
# -*- coding: UTF-8 -*-
"""
SVC处理逻辑
1. 根据跟定的vocal_url 判别男女
2. 根据男女信息选择适合的男女url
3. 模型推理
"""
import gc
import os
import sys
+
+sys.path.append(os.path.dirname(__file__))
+sys.path.append(os.path.join(os.path.dirname(__file__), "../"))
+
import json
import time
import socket
import logging
import hashlib
from flask import Flask, jsonify, request, abort
from redis_helper import RedisHelper
-from offline_server import (gs_server_redis_conf, gs_redis_conf, check_input, gs_err_code_pending,
- gs_err_code_params, gs_err_code_too_many_connections)
-
-sys.path.append(os.path.dirname(__file__))
-sys.path.append(os.path.join(os.path.dirname(__file__), "../"))
+from common import *
# 全局设置
hostname = socket.gethostname()
log_file_name = f"/tmp/av_meisheng_http_{hostname}.log"
logging.basicConfig(filename=log_file_name, format='%(asctime)s %(levelname)s %(message)s', datefmt='%Y-%m-%d %I:%M:%S',
level=logging.INFO)
app = Flask(__name__)
class HttpServer:
def __init__(self, redis_conf, server_conf):
self.redis_helper = RedisHelper(redis_conf)
self.server_conf = server_conf
def process(self, in_data):
msg = {
"status": gs_err_code_params,
"schedule": 100,
"gender": "unknown",
"target_song_url": "",
}
if not check_input(in_data):
return msg
if self.redis_helper.llen(self.server_conf["producer"]) > 10:
msg["status"] = gs_err_code_too_many_connections
return msg
distinct_id = hashlib.md5(in_data["record_song_url"].encode()).hexdigest()
distinct_key = self.server_conf["ai_meisheng_key_prefix"] + distinct_id
if not self.redis_helper.exists(distinct_key):
msg["status"] = gs_err_code_pending
self.redis_helper.set(distinct_key, json.dumps(msg))
self.redis_helper.lpush(self.server_conf["producer"], json.dumps(in_data))
self.redis_helper.expire(distinct_key, 15)
msg = self.redis_helper.get(distinct_key)
return json.loads(msg)
gs_http_server = HttpServer(gs_redis_conf, gs_server_redis_conf)
@app.route("/ai_meisheng", methods=["POST"])
def ai_meisheng():
data = request.json
st = time.time()
logging.info(f"ai_meisheng:in:{data}")
msg = gs_http_server.process(data)
json_msg = jsonify(msg)
logging.info(f"ai_meisheng:out:{data}-{json_msg}, sp={time.time() - st}")
return json_msg
if __name__ == "__main__":
app.run(host='0.0.0.0', port=5000, threaded=False)
diff --git a/AIMeiSheng/docker_demo/offline_server.py b/AIMeiSheng/docker_demo/offline_server.py
index 134f7f9..8aff81d 100644
--- a/AIMeiSheng/docker_demo/offline_server.py
+++ b/AIMeiSheng/docker_demo/offline_server.py
@@ -1,189 +1,157 @@
# -*- coding: UTF-8 -*-
"""
离线处理: 使用redis进行交互,从redis中获取数据资源,在将结果写入到redis
"""
import os
import sys
import time
import json
import socket
import hashlib
import logging
from redis_helper import RedisHelper
-from common import download2disk, exec_cmd, upload_file2cos
+from common import *
from svc_online import GSWorkerAttr, SVCOnline, volume_adjustment
hostname = socket.gethostname()
log_file_name = f"/tmp/av_meisheng_{hostname}.log"
logging.basicConfig(filename=log_file_name, format='%(asctime)s %(levelname)s %(message)s', datefmt='%Y-%m-%d %I:%M:%S',
level=logging.INFO)
-# errcode
-gs_err_code_success = 0
-gs_err_code_download_vocal = 100
-gs_err_code_download_svc_url = 101
-gs_err_code_svc_process = 102
-gs_err_code_transcode = 103
-gs_err_code_volume_adjust = 104
-gs_err_code_upload = 105
-gs_err_code_params = 106
-gs_err_code_pending = 107
-gs_err_code_too_many_connections = 429
-
sys.path.append(os.path.dirname(__file__))
sys.path.append(os.path.join(os.path.dirname(__file__), "../"))
-gs_redis_conf = {
- "host": "av-credis.starmaker.co",
- "port": 6379,
- "pwd": "lKoWEhz%jxTO",
-}
-
-gs_server_redis_conf = {
- "producer": "ai_meisheng_producer", # 输入的队列
- "ai_meisheng_key_prefix": "ai_meisheng_key_", # 存储结果情况
-}
-
def download_data(worker_attr):
vocal_path = os.path.join(worker_attr.tmp_dir, worker_attr.distinct_id)
if os.path.exists(vocal_path):
os.remove(vocal_path)
st = time.time()
if not download2disk(worker_attr.vocal_url, worker_attr.vocal_path):
return gs_err_code_download_vocal
logging.info(f"download vocal_url={worker_attr.vocal_url} sp = {time.time() - st}")
# download svc_source_url
if not os.path.exists(worker_attr.female_svc_source_path):
st = time.time()
if not download2disk(worker_attr.female_svc_source_url, worker_attr.female_svc_source_path):
return gs_err_code_download_svc_url
logging.info(f"download female_url={worker_attr.female_svc_source_url} sp = {time.time() - st}")
# download svc_source_url
if not os.path.exists(worker_attr.male_svc_source_path):
st = time.time()
if not download2disk(worker_attr.male_svc_source_url, worker_attr.male_svc_source_path):
return gs_err_code_download_svc_url
logging.info(f"download male_url={worker_attr.male_svc_source_url} sp = {time.time() - st}")
return gs_err_code_success
def transcode(wav_path, dst_path):
st = time.time()
cmd = f"ffmpeg -i {wav_path} -ar 44100 -ac 2 -b:a 64k -y {dst_path} -loglevel fatal"
exec_cmd(cmd)
logging.info(f"transcode cmd={cmd}, sp = {time.time() - st}")
return os.path.exists(dst_path)
-def check_input(input_data):
- key_list = ["record_song_url", "target_url", "start", "end", "vocal_loudness", "female_recording_url",
- "male_recording_url"]
- for key in key_list:
- if key not in input_data.keys():
- return False
- return True
-
-
class OfflineServer:
def __init__(self, redis_conf, server_conf, update_redis=False):
self.redis_helper = RedisHelper(redis_conf)
self.svc_online = SVCOnline()
self.server_conf = server_conf
self.distinct_key = server_conf["ai_meisheng_key_prefix"]
self.update_redis = update_redis
def exists(self):
return self.redis_helper.exists(self.distinct_key)
def update_result(self, errcode, schedule, gender, target_song_url):
msg = {
"status": errcode,
"schedule": schedule,
"gender": gender,
"target_song_url": target_song_url,
}
# 结果保存15min
if self.update_redis:
self.redis_helper.set(self.distinct_key, json.dumps(msg))
self.redis_helper.expire(self.distinct_key, 60 * 10)
def process_one(self, worker_attr):
self.distinct_key = self.server_conf["ai_meisheng_key_prefix"] + worker_attr.distinct_id
logging.info(f"{worker_attr.log_info_name()}, start download ...")
err = download_data(worker_attr)
if err != gs_err_code_success:
self.update_result(err, 100, "unknown", worker_attr.target_url)
return err, None, None
self.update_result(err, 35, "unknown", worker_attr.target_url)
logging.info(f"{worker_attr.log_info_name()}, start process ...")
gender = self.svc_online.process(worker_attr)
if not os.path.exists(worker_attr.target_wav_path):
self.update_result(gs_err_code_svc_process, 100, gender, worker_attr.target_url)
return gs_err_code_svc_process, None, None
self.update_result(err, 85, gender, worker_attr.target_url)
# 音量拉伸到指定响度
logging.info(f"{worker_attr.log_info_name()}, start volume_adjustment ...")
volume_adjustment(worker_attr.target_wav_path, worker_attr.target_loudness, worker_attr.target_wav_ad_path)
if not os.path.exists(worker_attr.target_wav_ad_path):
self.update_result(gs_err_code_volume_adjust, 100, gender, worker_attr.target_url)
return gs_err_code_volume_adjust, None, None
self.update_result(err, 90, gender, worker_attr.target_url)
# transcode
logging.info(f"{worker_attr.log_info_name()}, start transcode ...")
if not transcode(worker_attr.target_wav_path, worker_attr.target_path):
self.update_result(gs_err_code_transcode, 100, gender, worker_attr.target_url)
return gs_err_code_transcode, None, None
self.update_result(err, 95, gender, worker_attr.target_url)
# upload
logging.info(f"{worker_attr.log_info_name()}, start upload_file2cos ...")
st = time.time()
# 从target_url 分离出bucket_name,ap,和key
# "http://starmaker-sv-1256122840.cos.na-siliconvalley.myqcloud.com/production/ai_voice/7036874317774285/xxalkdjfladjflkasdf-target.mp4",
bucket_name = worker_attr.target_url.split(".")[0].split("//")[-1]
region = worker_attr.target_url.split(".")[2]
key = "/".join(worker_attr.target_url.split("/")[3:])
logging.info(f"{worker_attr.log_info_name()}, start upload_file2cos {bucket_name}, {region}, {key}")
if not upload_file2cos(key, worker_attr.target_path, region=region, bucket_name=bucket_name):
self.update_result(gs_err_code_upload, 100, gender, worker_attr.target_url)
return gs_err_code_upload, None, None
self.update_result(gs_err_code_success, 100, gender, worker_attr.target_url)
logging.info(f"{worker_attr.log_info_name()} upload {worker_attr.target_url} sp = {time.time() - st}")
return gs_err_code_success, worker_attr.target_url, gender
def process(self):
while True:
data = self.redis_helper.rpop(self.server_conf["producer"])
if data is None:
time.sleep(1)
continue
data = json.loads(data)
if not check_input(data):
logging.error(f"input data error={data}")
continue
worker_attr = GSWorkerAttr(data)
self.distinct_key = self.server_conf["ai_meisheng_key_prefix"] + worker_attr.distinct_id
if not self.exists():
logging.warning(f"input {data}, timeout abandon ....")
continue
st = time.time()
errcode, target_path, gender = self.process_one(worker_attr)
self.update_result(errcode, 100, gender, target_path)
logging.info(f"{worker_attr.log_info_name()} finish sp = {time.time() - st}")
if __name__ == '__main__':
offline_server = OfflineServer(gs_redis_conf, gs_server_redis_conf, True)
offline_server.process()
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Sun, Jan 12, 08:33 (1 d, 15 h)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
1347196
Default Alt Text
(14 KB)
Attached To
R350 av_svc
Event Timeline
Log In to Comment