Page Menu
Home
Phabricator
Search
Configure Global Search
Log In
Files
F4880594
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
7 KB
Subscribers
None
View Options
diff --git a/AIMeiSheng/docker_demo/offline_server.py b/AIMeiSheng/docker_demo/offline_server.py
index e6e1594..48205ea 100644
--- a/AIMeiSheng/docker_demo/offline_server.py
+++ b/AIMeiSheng/docker_demo/offline_server.py
@@ -1,153 +1,153 @@
# -*- coding: UTF-8 -*-
"""
离线处理: 使用redis进行交互,从redis中获取数据资源,在将结果写入到redis
"""
import os
import sys
import time
import json
import socket
import hashlib
from redis_helper import RedisHelper
from cos_helper import CosHelper
from common import *
import logging
from svc_online import GSWorkerAttr, SVCOnline, volume_adjustment, svc_offline_logger
sys.path.append(os.path.dirname(__file__))
sys.path.append(os.path.join(os.path.dirname(__file__), "../"))
def download_data(worker_attr):
if os.path.exists(worker_attr.vocal_path):
os.unlink(worker_attr.vocal_path)
st = time.time()
if not download2disk(worker_attr.vocal_url, worker_attr.vocal_path):
return gs_err_code_download_vocal
svc_offline_logger.info(f"download vocal_url={worker_attr.vocal_url} sp = {time.time() - st}")
# download svc_source_url
if not os.path.exists(worker_attr.female_svc_source_path):
st = time.time()
if not download2disk(worker_attr.female_svc_source_url, worker_attr.female_svc_source_path):
return gs_err_code_download_svc_url
svc_offline_logger.info(f"download female_url={worker_attr.female_svc_source_url} sp = {time.time() - st}")
# download svc_source_url
if not os.path.exists(worker_attr.male_svc_source_path):
st = time.time()
if not download2disk(worker_attr.male_svc_source_url, worker_attr.male_svc_source_path):
return gs_err_code_download_svc_url
svc_offline_logger.info(f"download male_url={worker_attr.male_svc_source_url} sp = {time.time() - st}")
return gs_err_code_success
def transcode(wav_path, dst_path):
st = time.time()
cmd = f"ffmpeg -i {wav_path} -ar 44100 -ac 1 -b:a 64k -y {dst_path} -loglevel fatal"
exec_cmd(cmd)
svc_offline_logger.info(f"transcode cmd={cmd}, sp = {time.time() - st}")
return os.path.exists(dst_path)
class OfflineServer:
def __init__(self, redis_conf, server_conf, update_redis=False):
st = time.time()
self.redis_helper = RedisHelper(redis_conf)
self.cos_helper = CosHelper()
self.svc_online = SVCOnline()
self.server_conf = server_conf
self.distinct_key = server_conf["ai_meisheng_key_prefix"]
self.update_redis = update_redis
svc_offline_logger.info(f"config={redis_conf}---server_conf={self.server_conf}")
svc_offline_logger.info(f"offline init finish sp={time.time() - st}")
def exists(self):
return self.redis_helper.exists(self.distinct_key)
def update_result(self, errcode, schedule, gender, target_song_url):
msg = {
"status": errcode,
"schedule": schedule,
"gender": gender,
"target_song_url": target_song_url,
}
# 结果保存15min
if self.update_redis:
self.redis_helper.set(self.distinct_key, json.dumps(msg))
self.redis_helper.expire(self.distinct_key, 60 * 10)
def process_one(self, worker_attr):
self.distinct_key = self.server_conf["ai_meisheng_key_prefix"] + worker_attr.distinct_id
svc_offline_logger.info(f"{worker_attr.log_info_name()}, start download ...")
err = download_data(worker_attr)
if err != gs_err_code_success:
self.update_result(err, 100, "unknown", worker_attr.target_url)
return err, None, None
self.update_result(err, 35, "unknown", worker_attr.target_url)
svc_offline_logger.info(f"{worker_attr.log_info_name()}, start process ...")
gender, err_code = self.svc_online.process(worker_attr)
if err_code == gs_err_code_target_silence : #unvoice err
return gs_err_code_target_silence, None, None
if not os.path.exists(worker_attr.target_wav_path):
self.update_result(gs_err_code_svc_process, 100, gender, worker_attr.target_url)
return gs_err_code_svc_process, None, None
self.update_result(err, 85, gender, worker_attr.target_url)
# 音量拉伸到指定响度
svc_offline_logger.info(f"{worker_attr.log_info_name()}, start volume_adjustment ...")
volume_adjustment(worker_attr.target_wav_path, worker_attr.target_loudness, worker_attr.target_wav_ad_path)
if not os.path.exists(worker_attr.target_wav_ad_path):
self.update_result(gs_err_code_volume_adjust, 100, gender, worker_attr.target_url)
return gs_err_code_volume_adjust, None, None
self.update_result(err, 90, gender, worker_attr.target_url)
# transcode
svc_offline_logger.info(f"{worker_attr.log_info_name()}, start transcode ...")
- if not transcode(worker_attr.target_wav_path, worker_attr.target_path):
+ if not transcode(worker_attr.target_wav_ad_path, worker_attr.target_path):
self.update_result(gs_err_code_transcode, 100, gender, worker_attr.target_url)
return gs_err_code_transcode, None, None
self.update_result(err, 95, gender, worker_attr.target_url)
# upload
svc_offline_logger.info(f"{worker_attr.log_info_name()}, start upload_file2cos ...")
st = time.time()
if not self.cos_helper.upload_by_url(worker_attr.target_path, worker_attr.target_url):
self.update_result(gs_err_code_upload, 100, gender, worker_attr.target_url)
return gs_err_code_upload, None, None
self.update_result(gs_err_code_success, 100, gender, worker_attr.target_url)
svc_offline_logger.info(f"{worker_attr.log_info_name()} upload {worker_attr.target_url} sp = {time.time() - st}")
return gs_err_code_success, worker_attr.target_url, gender
def process(self):
while True:
data = self.redis_helper.rpop(self.server_conf["producer"])
if data is None:
time.sleep(1)
continue
data = json.loads(data)
if not check_input(data):
svc_offline_logger.error(f"input data error={data}")
continue
worker_attr = GSWorkerAttr(data)
self.distinct_key = self.server_conf["ai_meisheng_key_prefix"] + worker_attr.distinct_id
if not self.exists():
svc_offline_logger.warning(f"input {data}, timeout abandon ....")
worker_attr.rm_cache()
continue
st = time.time()
errcode, target_path, gender = self.process_one(worker_attr)
self.update_result(errcode, 100, gender, target_path)
svc_offline_logger.info(f"{worker_attr.log_info_name()} finish errcode={errcode} sp = {time.time() - st}")
worker_attr.rm_cache()
if __name__ == '__main__':
offline_server = OfflineServer(gs_redis_conf, gs_server_redis_conf, True)
offline_server.process()
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Sun, Jan 12, 14:27 (1 d, 17 h)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
1346085
Default Alt Text
(7 KB)
Attached To
R350 av_svc
Event Timeline
Log In to Comment