Page Menu
Home
Phabricator
Search
Configure Global Search
Log In
Files
F5051979
common.py
No One
Temporary
Actions
Download File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
5 KB
Subscribers
None
common.py
View Options
# -*- coding: utf-8 -*-
import
logging
import
os
import
pymysql
from
datetime
import
datetime
,
timedelta
import
subprocess
import
logging
import
multiprocessing
as
mp
import
time
logging
.
basicConfig
(
level
=
logging
.
INFO
,
format
=
'
%(asctime)s
-
%(name)s
-
%(levelname)s
-
%(message)s
'
)
def
write_file
(
filename
,
data
):
with
open
(
filename
,
'w'
)
as
f
:
for
dt
in
data
:
dt
=
str
(
dt
)
.
strip
(
'
\n
'
)
f
.
write
(
str
(
dt
)
+
"
\n
"
)
def
read_file
(
filename
):
res
=
[]
with
open
(
filename
,
'r'
)
as
f
:
while
True
:
line
=
f
.
readline
()
if
not
line
:
break
res
.
append
(
line
.
strip
())
return
res
def
n_days_ago
(
n_time
,
days
):
"""
:param n_time: n_time => 20180719
:param days:
:return:
"""
now_time
=
datetime
.
strptime
(
n_time
,
'%Y%m
%d
'
)
delta
=
timedelta
(
days
=
days
)
n_days
=
now_time
-
delta
return
n_days
.
strftime
(
"%Y%m
%d
"
)
def
connect_db
(
host
=
"research-db-r1.starmaker.co"
,
port
=
3306
,
user
=
"root"
,
passwd
=
"Qrdl1130"
,
db
=
"rec"
):
logging
.
info
(
"connect mysql host={} port={} user={} passwd={} db={}"
.
format
(
host
,
port
,
user
,
passwd
,
db
))
return
pymysql
.
connect
(
host
=
host
,
port
=
port
,
user
=
user
,
passwd
=
passwd
,
db
=
db
)
def
get_data_by_sql
(
sql
):
db
=
connect_db
()
db_cursor
=
db
.
cursor
()
if
len
(
sql
)
<
100
:
logging
.
info
(
"execute = {}"
.
format
(
sql
))
else
:
logging
.
info
(
"execute = {}..."
.
format
(
sql
[:
100
]))
db_cursor
.
execute
(
sql
)
res
=
db_cursor
.
fetchall
()
db_cursor
.
close
()
db
.
close
()
logging
.
info
(
"res size={}"
.
format
(
len
(
res
)))
return
res
def
get_recording_msg_batch
(
filename
=
None
):
"""
分批获取msg
获取recording_id/user_id即可
:return:
"""
rid_uid_label
=
[]
max_item
=
100000
ssql
=
"select r_id, r_user_id,sm_labels from recording where r_id > {} and sm_labels like
\"
%male%
\"
order by r_id asc limit {}"
current_id
=
0
while
True
:
res
=
get_data_by_sql
(
ssql
.
format
(
current_id
,
max_item
))
if
len
(
res
)
==
0
:
break
current_id
=
res
[
-
1
][
0
]
rid_uid_label
.
extend
(
res
)
logging
.
info
(
"------current_size size={}"
.
format
(
len
(
rid_uid_label
)))
# 写入文件
if
filename
:
res_str
=
list
(
map
(
lambda
x
:
","
.
join
(
map
(
str
,
x
)),
rid_uid_label
))
write_file
(
filename
,
res_str
)
return
rid_uid_label
def
parse_label
(
label
):
label
=
str
(
label
)
.
lower
()
gender
=
"female"
idx
=
label
.
find
(
gender
)
if
idx
>=
0
:
label
=
label
.
replace
(
"female"
,
""
)
idx2
=
label
.
find
(
"male"
)
# 抛弃同时存在男和女
if
idx2
>=
0
:
return
""
return
gender
# 判断是否是男
gender
=
"male"
idx
=
label
.
find
(
gender
)
if
idx
>=
0
:
return
gender
return
""
def
parse_labels
(
rid_uid_label
,
filename
=
None
):
res
=
[]
for
rid
,
uid
,
label
in
rid_uid_label
:
gender
=
parse_label
(
label
)
if
""
!=
gender
:
res
.
append
((
rid
,
uid
,
gender
))
if
filename
:
res_str
=
list
(
map
(
lambda
x
:
","
.
join
(
map
(
str
,
x
)),
res
))
write_file
(
filename
,
res_str
)
return
res
def
parse_line
(
x
):
ss
=
str
(
x
)
.
strip
()
.
split
(
','
)
return
ss
[
0
],
ss
[
1
],
","
.
join
(
ss
[
2
:])
def
get_recording_cache
(
filename
=
None
):
"""
可以从缓存中取数据
:param filename:
:return:
"""
if
filename
:
res
=
read_file
(
filename
)
res
=
list
(
map
(
parse_line
,
res
))
return
res
return
get_recording_msg_batch
(
filename
)
def
func_run_time
(
func
):
def
wrapper
(
*
args
,
**
kw
):
local_time
=
time
.
time
()
func
(
*
args
,
**
kw
)
logging
.
info
(
'current Function [
%s
] run time is
%.2f
'
%
(
func
.
__name__
,
time
.
time
()
-
local_time
))
return
wrapper
def
download_mp4
(
dir
,
recording_id
):
"""
1 下载干声文件
2 下载完之后重命名
"""
file_path
=
os
.
path
.
join
(
dir
,
recording_id
)
filename_download
=
file_path
+
".download"
filename
=
file_path
+
".mp4"
if
os
.
path
.
exists
(
filename_download
):
os
.
unlink
(
filename_download
)
cmd
=
"coscmd -b starmaker-1256122840 download production/uploading/recordings/{}/origin_master.mp4 {}"
\
.
format
(
recording_id
,
filename_download
)
# logging.info("now:{}".format(cmd))
ret
=
os
.
system
(
cmd
)
if
not
ret
:
cmd
=
"mv {} {}"
.
format
(
filename_download
,
filename
)
os
.
system
(
cmd
)
return
True
return
False
class
SimpleMultiProcesser
:
"""
多进程处理类
目的:单进程生产,多进程消费,且不需要返回值
"""
def
__init__
(
self
,
data_path
,
worker_num
=
1
,
timeout
=
10
):
self
.
_worker_num
=
worker_num
self
.
_res
=
[]
self
.
_timeout
=
timeout
self
.
_data_path
=
data_path
@func_run_time
def
load_data
(
self
):
"""
数据载入函数,需要返回一个list
:return:
"""
return
[]
@func_run_time
def
processer
(
self
,
single_job
):
"""
处理list中单个数据的方法
:param single_job:
:return:
"""
pass
def
task_error_callback
(
self
,
msg
):
logging
.
error
(
msg
)
@func_run_time
def
process
(
self
):
tp_queue
=
self
.
load_data
()
logging
.
info
(
"process -- queue_size={} worker_num={} timeout={}"
.
format
(
len
(
tp_queue
),
self
.
_worker_num
,
self
.
_timeout
))
res
=
[]
pool
=
mp
.
Pool
(
processes
=
self
.
_worker_num
)
while
len
(
tp_queue
)
>
0
:
job
=
tp_queue
.
pop
()
ret
=
pool
.
apply_async
(
self
.
processer
,
args
=
(
job
,
),
error_callback
=
self
.
task_error_callback
)
res
.
append
(
ret
)
pool
.
close
()
pool
.
join
()
for
i
in
res
:
self
.
_res
.
append
(
i
.
get
(
timeout
=
self
.
_timeout
))
def
get_res_data
(
self
):
return
self
.
_res
File Metadata
Details
Attached
Mime Type
text/x-python
Expires
Sat, Jun 21, 16:57 (1 d, 11 h)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
1448238
Default Alt Text
common.py (5 KB)
Attached To
R350 av_svc
Event Timeline
Log In to Comment