Page MenuHomePhabricator

common.py
No OneTemporary

common.py

# -*- coding: utf-8 -*-
import logging
import os
import pymysql
from datetime import datetime, timedelta
import subprocess
import logging
import multiprocessing as mp
import time
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
def write_file(filename, data):
with open(filename, 'w') as f:
for dt in data:
dt = str(dt).strip('\n')
f.write(str(dt)+"\n")
def read_file(filename):
res = []
with open(filename, 'r') as f:
while True:
line = f.readline()
if not line:
break
res.append(line.strip())
return res
def n_days_ago(n_time, days):
"""
:param n_time: n_time => 20180719
:param days:
:return:
"""
now_time = datetime.strptime(n_time, '%Y%m%d')
delta = timedelta(days=days)
n_days = now_time - delta
return n_days.strftime("%Y%m%d")
def connect_db(host="research-db-r1.starmaker.co", port=3306, user="root", passwd="Qrdl1130", db="rec"):
logging.info("connect mysql host={} port={} user={} passwd={} db={}".format(host, port, user, passwd, db))
return pymysql.connect(host=host, port=port, user=user, passwd=passwd, db=db)
def get_data_by_sql(sql):
db = connect_db()
db_cursor = db.cursor()
if len(sql) < 100:
logging.info("execute = {}".format(sql))
else:
logging.info("execute = {}...".format(sql[:100]))
db_cursor.execute(sql)
res = db_cursor.fetchall()
db_cursor.close()
db.close()
logging.info("res size={}".format(len(res)))
return res
def get_recording_msg_batch(filename=None):
"""
分批获取msg
获取recording_id/user_id即可
:return:
"""
rid_uid_label = []
max_item = 100000
ssql = "select r_id, r_user_id,sm_labels from recording where r_id > {} and sm_labels like \"%male%\" order by r_id asc limit {}"
current_id = 0
while True:
res = get_data_by_sql(ssql.format(current_id, max_item))
if len(res) == 0:
break
current_id = res[-1][0]
rid_uid_label.extend(res)
logging.info("------current_size size={}".format(len(rid_uid_label)))
# 写入文件
if filename:
res_str = list(map(lambda x: ",".join(map(str, x)), rid_uid_label))
write_file(filename, res_str)
return rid_uid_label
def parse_label(label):
label = str(label).lower()
gender = "female"
idx = label.find(gender)
if idx >= 0:
label = label.replace("female", "")
idx2 = label.find("male")
# 抛弃同时存在男和女
if idx2 >= 0:
return ""
return gender
# 判断是否是男
gender = "male"
idx = label.find(gender)
if idx >= 0:
return gender
return ""
def parse_labels(rid_uid_label, filename=None):
res = []
for rid, uid, label in rid_uid_label:
gender = parse_label(label)
if "" != gender:
res.append((rid, uid, gender))
if filename:
res_str = list(map(lambda x: ",".join(map(str, x)), res))
write_file(filename, res_str)
return res
def parse_line(x):
ss = str(x).strip().split(',')
return ss[0], ss[1], ",".join(ss[2:])
def get_recording_cache(filename=None):
"""
可以从缓存中取数据
:param filename:
:return:
"""
if filename:
res = read_file(filename)
res = list(map(parse_line, res))
return res
return get_recording_msg_batch(filename)
def func_run_time(func):
def wrapper(*args, **kw):
local_time = time.time()
func(*args, **kw)
logging.info('current Function [%s] run time is %.2f' % (func.__name__, time.time() - local_time))
return wrapper
def download_mp4(dir, recording_id):
"""
1 下载干声文件
2 下载完之后重命名
"""
file_path = os.path.join(dir, recording_id)
filename_download = file_path + ".download"
filename = file_path + ".mp4"
if os.path.exists(filename_download):
os.unlink(filename_download)
cmd = "coscmd -b starmaker-1256122840 download production/uploading/recordings/{}/origin_master.mp4 {}"\
.format(recording_id, filename_download)
# logging.info("now:{}".format(cmd))
ret = os.system(cmd)
if not ret:
cmd = "mv {} {}".format(filename_download, filename)
os.system(cmd)
return True
return False
class SimpleMultiProcesser:
"""
多进程处理类
目的:单进程生产,多进程消费,且不需要返回值
"""
def __init__(self, data_path, worker_num=1, timeout=10):
self._worker_num = worker_num
self._res = []
self._timeout = timeout
self._data_path = data_path
@func_run_time
def load_data(self):
"""
数据载入函数,需要返回一个list
:return:
"""
return []
@func_run_time
def processer(self, single_job):
"""
处理list中单个数据的方法
:param single_job:
:return:
"""
pass
def task_error_callback(self, msg):
logging.error(msg)
@func_run_time
def process(self):
tp_queue = self.load_data()
logging.info("process -- queue_size={} worker_num={} timeout={}".format(len(tp_queue), self._worker_num,self._timeout))
res = []
pool = mp.Pool(processes=self._worker_num)
while len(tp_queue) > 0:
job = tp_queue.pop()
ret = pool.apply_async(self.processer, args=(job, ), error_callback=self.task_error_callback)
res.append(ret)
pool.close()
pool.join()
for i in res:
self._res.append(i.get(timeout=self._timeout))
def get_res_data(self):
return self._res

File Metadata

Mime Type
text/x-python
Expires
Sat, Jun 21, 16:57 (1 d, 11 h)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
1448238
Default Alt Text
common.py (5 KB)

Event Timeline