Page Menu
Home
Phabricator
Search
Configure Global Search
Log In
Files
F4880338
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
9 KB
Subscribers
None
View Options
diff --git a/AIMeiSheng/docker_demo/http_server.py b/AIMeiSheng/docker_demo/http_server.py
index f329506..4649ad5 100644
--- a/AIMeiSheng/docker_demo/http_server.py
+++ b/AIMeiSheng/docker_demo/http_server.py
@@ -1,83 +1,83 @@
# -*- coding: UTF-8 -*-
"""
SVC处理逻辑
1. 根据跟定的vocal_url 判别男女
2. 根据男女信息选择适合的男女url
3. 模型推理
"""
import gc
import os
import sys
sys.path.append(os.path.dirname(__file__))
sys.path.append(os.path.join(os.path.dirname(__file__), "../"))
import json
import time
import socket
import logging
import hashlib
from flask import Flask, jsonify, request, abort
from redis_helper import RedisHelper
from common import *
# 全局设置
logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', datefmt='%Y-%m-%d %I:%M:%S', level=logging.INFO)
app = Flask(__name__)
class HttpServer:
def __init__(self, redis_conf, server_conf):
st = time.time()
self.redis_helper = RedisHelper(redis_conf)
self.server_conf = server_conf
logging.info(f"HttpServer init conf={redis_conf}, {server_conf} sp={time.time() - st}")
def process(self, in_data):
msg = {
"status": gs_err_code_params,
"schedule": 100,
"gender": "unknown",
"target_song_url": "",
}
if not check_input(in_data):
return msg
if self.redis_helper.llen(self.server_conf["producer"]) > 10:
msg["status"] = gs_err_code_too_many_connections
return msg
distinct_id = hashlib.md5(in_data["record_song_url"].encode()).hexdigest()
distinct_key = self.server_conf["ai_meisheng_key_prefix"] + distinct_id
if not self.redis_helper.exists(distinct_key):
msg["status"] = gs_err_code_pending
self.redis_helper.set(distinct_key, json.dumps(msg))
self.redis_helper.lpush(self.server_conf["producer"], json.dumps(in_data))
self.redis_helper.expire(distinct_key, 15)
msg = self.redis_helper.get(distinct_key)
return json.loads(msg)
gs_http_server = HttpServer(gs_redis_conf, gs_server_redis_conf)
@app.route("/ai_meisheng", methods=["POST"])
def ai_meisheng():
data = request.json
st = time.time()
logging.info(f"ai_meisheng:in:{data}")
msg = gs_http_server.process(data)
json_msg = jsonify(msg)
- logging.info(f"ai_meisheng:out:{data}-{json_msg}, sp={time.time() - st}")
+ logging.info(f"ai_meisheng:out:{data}-{msg}, sp={time.time() - st}")
return json_msg
if __name__ == "__main__":
app.run(host='0.0.0.0', port=5000, threaded=False)
diff --git a/AIMeiSheng/docker_demo/offline_server.py b/AIMeiSheng/docker_demo/offline_server.py
index aa96faf..e6e1594 100644
--- a/AIMeiSheng/docker_demo/offline_server.py
+++ b/AIMeiSheng/docker_demo/offline_server.py
@@ -1,153 +1,153 @@
# -*- coding: UTF-8 -*-
"""
离线处理: 使用redis进行交互,从redis中获取数据资源,在将结果写入到redis
"""
import os
import sys
import time
import json
import socket
import hashlib
from redis_helper import RedisHelper
from cos_helper import CosHelper
from common import *
import logging
from svc_online import GSWorkerAttr, SVCOnline, volume_adjustment, svc_offline_logger
sys.path.append(os.path.dirname(__file__))
sys.path.append(os.path.join(os.path.dirname(__file__), "../"))
def download_data(worker_attr):
if os.path.exists(worker_attr.vocal_path):
os.unlink(worker_attr.vocal_path)
st = time.time()
if not download2disk(worker_attr.vocal_url, worker_attr.vocal_path):
return gs_err_code_download_vocal
svc_offline_logger.info(f"download vocal_url={worker_attr.vocal_url} sp = {time.time() - st}")
# download svc_source_url
if not os.path.exists(worker_attr.female_svc_source_path):
st = time.time()
if not download2disk(worker_attr.female_svc_source_url, worker_attr.female_svc_source_path):
return gs_err_code_download_svc_url
svc_offline_logger.info(f"download female_url={worker_attr.female_svc_source_url} sp = {time.time() - st}")
# download svc_source_url
if not os.path.exists(worker_attr.male_svc_source_path):
st = time.time()
if not download2disk(worker_attr.male_svc_source_url, worker_attr.male_svc_source_path):
return gs_err_code_download_svc_url
svc_offline_logger.info(f"download male_url={worker_attr.male_svc_source_url} sp = {time.time() - st}")
return gs_err_code_success
def transcode(wav_path, dst_path):
st = time.time()
cmd = f"ffmpeg -i {wav_path} -ar 44100 -ac 1 -b:a 64k -y {dst_path} -loglevel fatal"
exec_cmd(cmd)
svc_offline_logger.info(f"transcode cmd={cmd}, sp = {time.time() - st}")
return os.path.exists(dst_path)
class OfflineServer:
def __init__(self, redis_conf, server_conf, update_redis=False):
st = time.time()
self.redis_helper = RedisHelper(redis_conf)
self.cos_helper = CosHelper()
self.svc_online = SVCOnline()
self.server_conf = server_conf
self.distinct_key = server_conf["ai_meisheng_key_prefix"]
self.update_redis = update_redis
svc_offline_logger.info(f"config={redis_conf}---server_conf={self.server_conf}")
svc_offline_logger.info(f"offline init finish sp={time.time() - st}")
def exists(self):
return self.redis_helper.exists(self.distinct_key)
def update_result(self, errcode, schedule, gender, target_song_url):
msg = {
"status": errcode,
"schedule": schedule,
"gender": gender,
"target_song_url": target_song_url,
}
# 结果保存15min
if self.update_redis:
self.redis_helper.set(self.distinct_key, json.dumps(msg))
self.redis_helper.expire(self.distinct_key, 60 * 10)
def process_one(self, worker_attr):
self.distinct_key = self.server_conf["ai_meisheng_key_prefix"] + worker_attr.distinct_id
svc_offline_logger.info(f"{worker_attr.log_info_name()}, start download ...")
err = download_data(worker_attr)
if err != gs_err_code_success:
self.update_result(err, 100, "unknown", worker_attr.target_url)
return err, None, None
self.update_result(err, 35, "unknown", worker_attr.target_url)
svc_offline_logger.info(f"{worker_attr.log_info_name()}, start process ...")
gender, err_code = self.svc_online.process(worker_attr)
if err_code == gs_err_code_target_silence : #unvoice err
return gs_err_code_target_silence, None, None
if not os.path.exists(worker_attr.target_wav_path):
self.update_result(gs_err_code_svc_process, 100, gender, worker_attr.target_url)
return gs_err_code_svc_process, None, None
self.update_result(err, 85, gender, worker_attr.target_url)
# 音量拉伸到指定响度
svc_offline_logger.info(f"{worker_attr.log_info_name()}, start volume_adjustment ...")
volume_adjustment(worker_attr.target_wav_path, worker_attr.target_loudness, worker_attr.target_wav_ad_path)
if not os.path.exists(worker_attr.target_wav_ad_path):
self.update_result(gs_err_code_volume_adjust, 100, gender, worker_attr.target_url)
return gs_err_code_volume_adjust, None, None
self.update_result(err, 90, gender, worker_attr.target_url)
# transcode
svc_offline_logger.info(f"{worker_attr.log_info_name()}, start transcode ...")
if not transcode(worker_attr.target_wav_path, worker_attr.target_path):
self.update_result(gs_err_code_transcode, 100, gender, worker_attr.target_url)
return gs_err_code_transcode, None, None
self.update_result(err, 95, gender, worker_attr.target_url)
# upload
svc_offline_logger.info(f"{worker_attr.log_info_name()}, start upload_file2cos ...")
st = time.time()
if not self.cos_helper.upload_by_url(worker_attr.target_path, worker_attr.target_url):
self.update_result(gs_err_code_upload, 100, gender, worker_attr.target_url)
return gs_err_code_upload, None, None
self.update_result(gs_err_code_success, 100, gender, worker_attr.target_url)
svc_offline_logger.info(f"{worker_attr.log_info_name()} upload {worker_attr.target_url} sp = {time.time() - st}")
return gs_err_code_success, worker_attr.target_url, gender
def process(self):
while True:
data = self.redis_helper.rpop(self.server_conf["producer"])
if data is None:
time.sleep(1)
continue
data = json.loads(data)
if not check_input(data):
svc_offline_logger.error(f"input data error={data}")
continue
worker_attr = GSWorkerAttr(data)
self.distinct_key = self.server_conf["ai_meisheng_key_prefix"] + worker_attr.distinct_id
if not self.exists():
svc_offline_logger.warning(f"input {data}, timeout abandon ....")
worker_attr.rm_cache()
continue
st = time.time()
errcode, target_path, gender = self.process_one(worker_attr)
self.update_result(errcode, 100, gender, target_path)
- svc_offline_logger.info(f"{worker_attr.log_info_name()} finish sp = {time.time() - st}")
+ svc_offline_logger.info(f"{worker_attr.log_info_name()} finish errcode={errcode} sp = {time.time() - st}")
worker_attr.rm_cache()
if __name__ == '__main__':
offline_server = OfflineServer(gs_redis_conf, gs_server_redis_conf, True)
offline_server.process()
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Sun, Jan 12, 08:34 (1 d, 15 h)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
1347205
Default Alt Text
(9 KB)
Attached To
R350 av_svc
Event Timeline
Log In to Comment