Page Menu
Home
Phabricator
Search
Configure Global Search
Log In
Files
F5021484
offline_server.py
No One
Temporary
Actions
Download File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
7 KB
Subscribers
None
offline_server.py
View Options
# -*- coding: UTF-8 -*-
"""
离线处理: 使用redis进行交互,从redis中获取数据资源,在将结果写入到redis
"""
import
os
import
sys
import
time
import
json
import
socket
import
hashlib
from
redis_helper
import
RedisHelper
from
cos_helper
import
CosHelper
from
common
import
*
import
logging
from
svc_online
import
GSWorkerAttr
,
SVCOnline
,
volume_adjustment
,
svc_offline_logger
sys
.
path
.
append
(
os
.
path
.
dirname
(
__file__
))
sys
.
path
.
append
(
os
.
path
.
join
(
os
.
path
.
dirname
(
__file__
),
"../"
))
def
download_data
(
worker_attr
):
if
os
.
path
.
exists
(
worker_attr
.
vocal_path
):
os
.
unlink
(
worker_attr
.
vocal_path
)
st
=
time
.
time
()
if
not
download2disk
(
worker_attr
.
vocal_url
,
worker_attr
.
vocal_path
):
return
gs_err_code_download_vocal
svc_offline_logger
.
info
(
f
"download vocal_url={worker_attr.vocal_url} sp = {time.time() - st}"
)
# download svc_source_url
if
not
os
.
path
.
exists
(
worker_attr
.
female_svc_source_path
):
st
=
time
.
time
()
if
not
download2disk
(
worker_attr
.
female_svc_source_url
,
worker_attr
.
female_svc_source_path
):
return
gs_err_code_download_svc_url
svc_offline_logger
.
info
(
f
"download female_url={worker_attr.female_svc_source_url} sp = {time.time() - st}"
)
# download svc_source_url
if
not
os
.
path
.
exists
(
worker_attr
.
male_svc_source_path
):
st
=
time
.
time
()
if
not
download2disk
(
worker_attr
.
male_svc_source_url
,
worker_attr
.
male_svc_source_path
):
return
gs_err_code_download_svc_url
svc_offline_logger
.
info
(
f
"download male_url={worker_attr.male_svc_source_url} sp = {time.time() - st}"
)
return
gs_err_code_success
def
transcode
(
wav_path
,
dst_path
):
st
=
time
.
time
()
cmd
=
f
"ffmpeg -i {wav_path} -ar 44100 -ac 1 -b:a 64k -y {dst_path} -loglevel fatal"
exec_cmd
(
cmd
)
svc_offline_logger
.
info
(
f
"transcode cmd={cmd}, sp = {time.time() - st}"
)
return
os
.
path
.
exists
(
dst_path
)
class
OfflineServer
:
def
__init__
(
self
,
redis_conf
,
server_conf
,
update_redis
=
False
):
st
=
time
.
time
()
self
.
redis_helper
=
RedisHelper
(
redis_conf
)
self
.
cos_helper
=
CosHelper
()
self
.
svc_online
=
SVCOnline
()
self
.
server_conf
=
server_conf
self
.
distinct_key
=
server_conf
[
"ai_meisheng_key_prefix"
]
self
.
update_redis
=
update_redis
svc_offline_logger
.
info
(
f
"config={redis_conf}---server_conf={self.server_conf}"
)
svc_offline_logger
.
info
(
f
"offline init finish sp={time.time() - st}"
)
def
exists
(
self
):
return
self
.
redis_helper
.
exists
(
self
.
distinct_key
)
def
update_result
(
self
,
errcode
,
schedule
,
gender
,
target_song_url
):
msg
=
{
"status"
:
errcode
,
"schedule"
:
schedule
,
"gender"
:
gender
,
"target_song_url"
:
target_song_url
,
}
# 结果保存15min
if
self
.
update_redis
:
self
.
redis_helper
.
set
(
self
.
distinct_key
,
json
.
dumps
(
msg
))
self
.
redis_helper
.
expire
(
self
.
distinct_key
,
60
*
10
)
def
process_one
(
self
,
worker_attr
):
self
.
distinct_key
=
self
.
server_conf
[
"ai_meisheng_key_prefix"
]
+
worker_attr
.
distinct_id
svc_offline_logger
.
info
(
f
"{worker_attr.log_info_name()}, start download ..."
)
err
=
download_data
(
worker_attr
)
if
err
!=
gs_err_code_success
:
self
.
update_result
(
err
,
100
,
"unknown"
,
worker_attr
.
target_url
)
return
err
,
None
,
None
self
.
update_result
(
err
,
35
,
"unknown"
,
worker_attr
.
target_url
)
if
not
transcode
(
worker_attr
.
vocal_path
,
worker_attr
.
vocal_tmp_path
):
return
gs_err_code_transcode
,
None
,
None
os
.
unlink
(
worker_attr
.
vocal_path
)
os
.
rename
(
worker_attr
.
vocal_tmp_path
,
worker_attr
.
vocal_path
)
svc_offline_logger
.
info
(
f
"{worker_attr.log_info_name()}, start process ..."
)
gender
,
err_code
=
self
.
svc_online
.
process
(
worker_attr
)
if
err_code
==
gs_err_code_target_silence
:
#unvoice err
return
gs_err_code_target_silence
,
None
,
None
if
not
os
.
path
.
exists
(
worker_attr
.
target_wav_path
):
self
.
update_result
(
gs_err_code_svc_process
,
100
,
gender
,
worker_attr
.
target_url
)
return
gs_err_code_svc_process
,
None
,
None
self
.
update_result
(
err
,
85
,
gender
,
worker_attr
.
target_url
)
# 音量拉伸到指定响度
svc_offline_logger
.
info
(
f
"{worker_attr.log_info_name()}, start volume_adjustment ..."
)
volume_adjustment
(
worker_attr
.
target_wav_path
,
worker_attr
.
target_loudness
,
worker_attr
.
target_wav_ad_path
)
if
not
os
.
path
.
exists
(
worker_attr
.
target_wav_ad_path
):
self
.
update_result
(
gs_err_code_volume_adjust
,
100
,
gender
,
worker_attr
.
target_url
)
return
gs_err_code_volume_adjust
,
None
,
None
self
.
update_result
(
err
,
90
,
gender
,
worker_attr
.
target_url
)
# transcode
svc_offline_logger
.
info
(
f
"{worker_attr.log_info_name()}, start transcode ..."
)
if
not
transcode
(
worker_attr
.
target_wav_ad_path
,
worker_attr
.
target_path
):
self
.
update_result
(
gs_err_code_transcode
,
100
,
gender
,
worker_attr
.
target_url
)
return
gs_err_code_transcode
,
None
,
None
self
.
update_result
(
err
,
95
,
gender
,
worker_attr
.
target_url
)
# upload
svc_offline_logger
.
info
(
f
"{worker_attr.log_info_name()}, start upload_file2cos ..."
)
st
=
time
.
time
()
if
not
self
.
cos_helper
.
upload_by_url
(
worker_attr
.
target_path
,
worker_attr
.
target_url
):
self
.
update_result
(
gs_err_code_upload
,
100
,
gender
,
worker_attr
.
target_url
)
return
gs_err_code_upload
,
None
,
None
self
.
update_result
(
gs_err_code_success
,
100
,
gender
,
worker_attr
.
target_url
)
svc_offline_logger
.
info
(
f
"{worker_attr.log_info_name()} upload {worker_attr.target_url} sp = {time.time() - st}"
)
return
gs_err_code_success
,
worker_attr
.
target_url
,
gender
def
process
(
self
):
while
True
:
data
=
self
.
redis_helper
.
rpop
(
self
.
server_conf
[
"producer"
])
if
data
is
None
:
time
.
sleep
(
1
)
continue
data
=
json
.
loads
(
data
)
if
not
check_input
(
data
):
svc_offline_logger
.
error
(
f
"input data error={data}"
)
continue
worker_attr
=
GSWorkerAttr
(
data
)
self
.
distinct_key
=
self
.
server_conf
[
"ai_meisheng_key_prefix"
]
+
worker_attr
.
distinct_id
if
not
self
.
exists
():
svc_offline_logger
.
warning
(
f
"input {data}, timeout abandon ...."
)
worker_attr
.
rm_cache
()
continue
st
=
time
.
time
()
errcode
,
target_path
,
gender
=
self
.
process_one
(
worker_attr
)
self
.
update_result
(
errcode
,
100
,
gender
,
target_path
)
svc_offline_logger
.
info
(
f
"{worker_attr.log_info_name()} finish errcode={errcode} sp = {time.time() - st}"
)
worker_attr
.
rm_cache
()
if
__name__
==
'__main__'
:
offline_server
=
OfflineServer
(
gs_redis_conf
,
gs_server_redis_conf
,
True
)
offline_server
.
process
()
File Metadata
Details
Attached
Mime Type
text/x-python
Expires
Mon, May 19, 06:10 (23 h, 20 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
1433528
Default Alt Text
offline_server.py (7 KB)
Attached To
R350 av_svc
Event Timeline
Log In to Comment