diff --git a/AIMeiSheng/docker_demo/common.py b/AIMeiSheng/docker_demo/common.py index 3eff425..25d701e 100644 --- a/AIMeiSheng/docker_demo/common.py +++ b/AIMeiSheng/docker_demo/common.py @@ -1,61 +1,93 @@ import os import time import logging import urllib, urllib.request gs_tmp_dir = "/tmp/ai_meisheng_tmp" gs_model_dir = "/tmp/ai_meisheng_models" gs_resource_cache_dir = "/tmp/ai_meisheng_resource_cache" gs_svc_model_path = os.path.join(gs_model_dir, "weights/xusong_v2_org_version_alldata_embed1_enzx_diff_fi_e15_s244110.pth") gs_embed_model_path = os.path.join(gs_model_dir, "RawNet3/models/weights/model.pt") gs_hubert_model_path = os.path.join(gs_model_dir, "hubert.pt") gs_rmvpe_model_path = os.path.join(gs_model_dir, "rmvpe.pt") +# errcode +gs_err_code_success = 0 +gs_err_code_download_vocal = 100 +gs_err_code_download_svc_url = 101 +gs_err_code_svc_process = 102 +gs_err_code_transcode = 103 +gs_err_code_volume_adjust = 104 +gs_err_code_upload = 105 +gs_err_code_params = 106 +gs_err_code_pending = 107 +gs_err_code_too_many_connections = 429 + +gs_redis_conf = { + "host": "av-credis.starmaker.co", + "port": 6379, + "pwd": "lKoWEhz%jxTO", +} + +gs_server_redis_conf = { + "producer": "ai_meisheng_producer", # 输入的队列 + "ai_meisheng_key_prefix": "ai_meisheng_key_", # 存储结果情况 +} + def download2disk(url, dst_path): st = time.time() urllib.request.urlretrieve(url, dst_path) print(f"download {url} -> {dst_path} sp = {time.time() - st}") return os.path.exists(dst_path) def exec_cmd(cmd): # gs_logger.info(cmd) print(cmd) ret = os.system(cmd) if ret != 0: return False return True def exec_cmd_and_result(cmd): r = os.popen(cmd) text = r.read() r.close() return text def upload_file2cos(key, file_path, region='ap-singapore', bucket_name='av-audit-sync-sg-1256122840'): """ 将文件上传到cos :param key: 桶上的具体地址 :param file_path: 本地文件地址 :param region: 区域 :param bucket_name: 桶地址 :return: """ gs_coscmd = "coscmd" gs_coscmd_conf = "~/.cos.conf" cmd = "{} -c {} -r {} -b {} upload {} {}".format(gs_coscmd, gs_coscmd_conf, region, bucket_name, file_path, key) if exec_cmd(cmd): cmd = "{} -c {} -r {} -b {} info {}".format(gs_coscmd, gs_coscmd_conf, region, bucket_name, key) \ + "| grep Content-Length |awk \'{print $2}\'" res_str = exec_cmd_and_result(cmd) logging.info("{},res={}".format(key, res_str)) size = float(res_str) if size > 0: return True return False return False + + +def check_input(input_data): + key_list = ["record_song_url", "target_url", "start", "end", "vocal_loudness", "female_recording_url", + "male_recording_url"] + for key in key_list: + if key not in input_data.keys(): + return False + return True diff --git a/AIMeiSheng/docker_demo/http_server.py b/AIMeiSheng/docker_demo/http_server.py index a943980..ffa160e 100644 --- a/AIMeiSheng/docker_demo/http_server.py +++ b/AIMeiSheng/docker_demo/http_server.py @@ -1,84 +1,84 @@ # -*- coding: UTF-8 -*- """ SVC处理逻辑 1. 根据跟定的vocal_url 判别男女 2. 根据男女信息选择适合的男女url 3. 模型推理 """ import gc import os import sys + +sys.path.append(os.path.dirname(__file__)) +sys.path.append(os.path.join(os.path.dirname(__file__), "../")) + import json import time import socket import logging import hashlib from flask import Flask, jsonify, request, abort from redis_helper import RedisHelper -from offline_server import (gs_server_redis_conf, gs_redis_conf, check_input, gs_err_code_pending, - gs_err_code_params, gs_err_code_too_many_connections) - -sys.path.append(os.path.dirname(__file__)) -sys.path.append(os.path.join(os.path.dirname(__file__), "../")) +from common import * # 全局设置 hostname = socket.gethostname() log_file_name = f"/tmp/av_meisheng_http_{hostname}.log" logging.basicConfig(filename=log_file_name, format='%(asctime)s %(levelname)s %(message)s', datefmt='%Y-%m-%d %I:%M:%S', level=logging.INFO) app = Flask(__name__) class HttpServer: def __init__(self, redis_conf, server_conf): self.redis_helper = RedisHelper(redis_conf) self.server_conf = server_conf def process(self, in_data): msg = { "status": gs_err_code_params, "schedule": 100, "gender": "unknown", "target_song_url": "", } if not check_input(in_data): return msg if self.redis_helper.llen(self.server_conf["producer"]) > 10: msg["status"] = gs_err_code_too_many_connections return msg distinct_id = hashlib.md5(in_data["record_song_url"].encode()).hexdigest() distinct_key = self.server_conf["ai_meisheng_key_prefix"] + distinct_id if not self.redis_helper.exists(distinct_key): msg["status"] = gs_err_code_pending self.redis_helper.set(distinct_key, json.dumps(msg)) self.redis_helper.lpush(self.server_conf["producer"], json.dumps(in_data)) self.redis_helper.expire(distinct_key, 15) msg = self.redis_helper.get(distinct_key) return json.loads(msg) gs_http_server = HttpServer(gs_redis_conf, gs_server_redis_conf) @app.route("/ai_meisheng", methods=["POST"]) def ai_meisheng(): data = request.json st = time.time() logging.info(f"ai_meisheng:in:{data}") msg = gs_http_server.process(data) json_msg = jsonify(msg) logging.info(f"ai_meisheng:out:{data}-{json_msg}, sp={time.time() - st}") return json_msg if __name__ == "__main__": app.run(host='0.0.0.0', port=5000, threaded=False) diff --git a/AIMeiSheng/docker_demo/offline_server.py b/AIMeiSheng/docker_demo/offline_server.py index 134f7f9..8aff81d 100644 --- a/AIMeiSheng/docker_demo/offline_server.py +++ b/AIMeiSheng/docker_demo/offline_server.py @@ -1,189 +1,157 @@ # -*- coding: UTF-8 -*- """ 离线处理: 使用redis进行交互,从redis中获取数据资源,在将结果写入到redis """ import os import sys import time import json import socket import hashlib import logging from redis_helper import RedisHelper -from common import download2disk, exec_cmd, upload_file2cos +from common import * from svc_online import GSWorkerAttr, SVCOnline, volume_adjustment hostname = socket.gethostname() log_file_name = f"/tmp/av_meisheng_{hostname}.log" logging.basicConfig(filename=log_file_name, format='%(asctime)s %(levelname)s %(message)s', datefmt='%Y-%m-%d %I:%M:%S', level=logging.INFO) -# errcode -gs_err_code_success = 0 -gs_err_code_download_vocal = 100 -gs_err_code_download_svc_url = 101 -gs_err_code_svc_process = 102 -gs_err_code_transcode = 103 -gs_err_code_volume_adjust = 104 -gs_err_code_upload = 105 -gs_err_code_params = 106 -gs_err_code_pending = 107 -gs_err_code_too_many_connections = 429 - sys.path.append(os.path.dirname(__file__)) sys.path.append(os.path.join(os.path.dirname(__file__), "../")) -gs_redis_conf = { - "host": "av-credis.starmaker.co", - "port": 6379, - "pwd": "lKoWEhz%jxTO", -} - -gs_server_redis_conf = { - "producer": "ai_meisheng_producer", # 输入的队列 - "ai_meisheng_key_prefix": "ai_meisheng_key_", # 存储结果情况 -} - def download_data(worker_attr): vocal_path = os.path.join(worker_attr.tmp_dir, worker_attr.distinct_id) if os.path.exists(vocal_path): os.remove(vocal_path) st = time.time() if not download2disk(worker_attr.vocal_url, worker_attr.vocal_path): return gs_err_code_download_vocal logging.info(f"download vocal_url={worker_attr.vocal_url} sp = {time.time() - st}") # download svc_source_url if not os.path.exists(worker_attr.female_svc_source_path): st = time.time() if not download2disk(worker_attr.female_svc_source_url, worker_attr.female_svc_source_path): return gs_err_code_download_svc_url logging.info(f"download female_url={worker_attr.female_svc_source_url} sp = {time.time() - st}") # download svc_source_url if not os.path.exists(worker_attr.male_svc_source_path): st = time.time() if not download2disk(worker_attr.male_svc_source_url, worker_attr.male_svc_source_path): return gs_err_code_download_svc_url logging.info(f"download male_url={worker_attr.male_svc_source_url} sp = {time.time() - st}") return gs_err_code_success def transcode(wav_path, dst_path): st = time.time() cmd = f"ffmpeg -i {wav_path} -ar 44100 -ac 2 -b:a 64k -y {dst_path} -loglevel fatal" exec_cmd(cmd) logging.info(f"transcode cmd={cmd}, sp = {time.time() - st}") return os.path.exists(dst_path) -def check_input(input_data): - key_list = ["record_song_url", "target_url", "start", "end", "vocal_loudness", "female_recording_url", - "male_recording_url"] - for key in key_list: - if key not in input_data.keys(): - return False - return True - - class OfflineServer: def __init__(self, redis_conf, server_conf, update_redis=False): self.redis_helper = RedisHelper(redis_conf) self.svc_online = SVCOnline() self.server_conf = server_conf self.distinct_key = server_conf["ai_meisheng_key_prefix"] self.update_redis = update_redis def exists(self): return self.redis_helper.exists(self.distinct_key) def update_result(self, errcode, schedule, gender, target_song_url): msg = { "status": errcode, "schedule": schedule, "gender": gender, "target_song_url": target_song_url, } # 结果保存15min if self.update_redis: self.redis_helper.set(self.distinct_key, json.dumps(msg)) self.redis_helper.expire(self.distinct_key, 60 * 10) def process_one(self, worker_attr): self.distinct_key = self.server_conf["ai_meisheng_key_prefix"] + worker_attr.distinct_id logging.info(f"{worker_attr.log_info_name()}, start download ...") err = download_data(worker_attr) if err != gs_err_code_success: self.update_result(err, 100, "unknown", worker_attr.target_url) return err, None, None self.update_result(err, 35, "unknown", worker_attr.target_url) logging.info(f"{worker_attr.log_info_name()}, start process ...") gender = self.svc_online.process(worker_attr) if not os.path.exists(worker_attr.target_wav_path): self.update_result(gs_err_code_svc_process, 100, gender, worker_attr.target_url) return gs_err_code_svc_process, None, None self.update_result(err, 85, gender, worker_attr.target_url) # 音量拉伸到指定响度 logging.info(f"{worker_attr.log_info_name()}, start volume_adjustment ...") volume_adjustment(worker_attr.target_wav_path, worker_attr.target_loudness, worker_attr.target_wav_ad_path) if not os.path.exists(worker_attr.target_wav_ad_path): self.update_result(gs_err_code_volume_adjust, 100, gender, worker_attr.target_url) return gs_err_code_volume_adjust, None, None self.update_result(err, 90, gender, worker_attr.target_url) # transcode logging.info(f"{worker_attr.log_info_name()}, start transcode ...") if not transcode(worker_attr.target_wav_path, worker_attr.target_path): self.update_result(gs_err_code_transcode, 100, gender, worker_attr.target_url) return gs_err_code_transcode, None, None self.update_result(err, 95, gender, worker_attr.target_url) # upload logging.info(f"{worker_attr.log_info_name()}, start upload_file2cos ...") st = time.time() # 从target_url 分离出bucket_name,ap,和key # "http://starmaker-sv-1256122840.cos.na-siliconvalley.myqcloud.com/production/ai_voice/7036874317774285/xxalkdjfladjflkasdf-target.mp4", bucket_name = worker_attr.target_url.split(".")[0].split("//")[-1] region = worker_attr.target_url.split(".")[2] key = "/".join(worker_attr.target_url.split("/")[3:]) logging.info(f"{worker_attr.log_info_name()}, start upload_file2cos {bucket_name}, {region}, {key}") if not upload_file2cos(key, worker_attr.target_path, region=region, bucket_name=bucket_name): self.update_result(gs_err_code_upload, 100, gender, worker_attr.target_url) return gs_err_code_upload, None, None self.update_result(gs_err_code_success, 100, gender, worker_attr.target_url) logging.info(f"{worker_attr.log_info_name()} upload {worker_attr.target_url} sp = {time.time() - st}") return gs_err_code_success, worker_attr.target_url, gender def process(self): while True: data = self.redis_helper.rpop(self.server_conf["producer"]) if data is None: time.sleep(1) continue data = json.loads(data) if not check_input(data): logging.error(f"input data error={data}") continue worker_attr = GSWorkerAttr(data) self.distinct_key = self.server_conf["ai_meisheng_key_prefix"] + worker_attr.distinct_id if not self.exists(): logging.warning(f"input {data}, timeout abandon ....") continue st = time.time() errcode, target_path, gender = self.process_one(worker_attr) self.update_result(errcode, 100, gender, target_path) logging.info(f"{worker_attr.log_info_name()} finish sp = {time.time() - st}") if __name__ == '__main__': offline_server = OfflineServer(gs_redis_conf, gs_server_redis_conf, True) offline_server.process()